Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
38f8826
feat: add subscriptions hook
alepane21 Jul 18, 2025
824362b
chore: add tests of OnSubscriptionStartFn
alepane21 Jul 19, 2025
cdfa9bf
chore: remove duplicated test
alepane21 Jul 19, 2025
f8de362
chore: add test of error with Start method
alepane21 Jul 19, 2025
e500d1c
chore: document OnSubscriptionStartFn
alepane21 Jul 19, 2025
c970c07
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 19, 2025
2a6b5f0
chore: refactores the OnSubscriptionStart hook to be called from the …
alepane21 Jul 22, 2025
1e6ed5f
fix: behaviour when errors happend have been changed by mistake
alepane21 Jul 22, 2025
d776943
chore: change OnSubscriptionStart hook contract
alepane21 Jul 22, 2025
2f34327
chore: hooks now can decide if the subscription has to end
alepane21 Jul 22, 2025
dc89310
chore: events are written directly on the resolveCtx
alepane21 Jul 23, 2025
3cd5690
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 23, 2025
bfad223
chore: improve test
alepane21 Jul 23, 2025
91dbadd
chore: improve names to align with the ones in the ADR of the router
alepane21 Jul 23, 2025
207adf7
chore: execute hooks inside worker, so that we don't block the main l…
alepane21 Jul 23, 2025
3257c52
fix: if only one event was to be skipped, every event was skipped! re…
alepane21 Jul 23, 2025
ec79b06
chore: remove now useless handleTriggerUpdateSubscription
alepane21 Jul 23, 2025
e74d079
chore: send updates on subscription on time (there is still a data ra…
alepane21 Jul 24, 2025
41b92a0
chore: avoid workChan races
alepane21 Jul 24, 2025
7bb7431
chore: lint issue
alepane21 Jul 24, 2025
e0845be
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 29, 2025
3b61ef2
chore: rename SubscriptionOnStartFns to StartupHooks for clarity
alepane21 Jul 29, 2025
ae55c92
Merge remote-tracking branch 'origin/master' into ale/eng-7600-add-su…
alepane21 Jul 29, 2025
87cb3a8
Merge remote-tracking branch 'origin/ale/eng-7600-add-subscriptionons…
alepane21 Jul 29, 2025
ea8ab75
chore: wait for the initial hook execution before starting the dataso…
alepane21 Aug 5, 2025
2ed6134
fix: avoid locking worker goroutine if the trigger was already set up
alepane21 Aug 6, 2025
5fa5a76
chore: add test to verify Resolve beahviour with more than one subscr…
alepane21 Aug 6, 2025
8731ff2
chore: remove return close option
alepane21 Aug 19, 2025
6793f09
chore: HookableSubscriptionDataSource should also be without close
alepane21 Aug 19, 2025
fdf829a
chore: fix initialHooksClose value
alepane21 Aug 19, 2025
7b569b6
chore: simplify hooks implementation
alepane21 Aug 20, 2025
4a94540
chore: remove pinned subscription
alepane21 Aug 20, 2025
a673838
chore: remove unused code, fix naming
alepane21 Aug 20, 2025
46d97b2
chore: fix test
alepane21 Aug 20, 2025
447c103
chore: avoid data source start if the subscription should close
alepane21 Aug 20, 2025
7bb99a5
chore: improve resolve structure
alepane21 Aug 21, 2025
f8c9d06
chore: type
alepane21 Aug 21, 2025
baec1da
chore: readd pinned updater to sub
alepane21 Aug 21, 2025
2100f14
chore: simplify events flow
alepane21 Aug 22, 2025
311b021
chore: fix compile issues
StarpTech Aug 23, 2025
e0af0a0
chore: fix tests
alepane21 Aug 25, 2025
f4eba63
chore: remove pubsub datasource implementations and related tests, it…
alepane21 Aug 25, 2025
0efce3c
test: add new test for handling multiple subscriptions with the same …
alepane21 Aug 25, 2025
6234791
chore: execute startup hooks inside worker go routine
alepane21 Sep 3, 2025
8df1b67
chore: remove parameter not used anymore
alepane21 Sep 3, 2025
a0024f0
chore: add subscription to registry after startup hooks execution
alepane21 Sep 9, 2025
ddc652f
Merge branch 'topic/streams-v1' into ale/eng-7600-add-subscriptionons…
dkorittki Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions v2/pkg/engine/datasource/graphql_datasource/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/federation"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
)
Expand Down Expand Up @@ -103,6 +104,11 @@ type SingleTypeField struct {
FieldName string
}

// OnSubscriptionStartFn defines a hook function that is called when a subscription starts.
// It receives the resolve context and can return initial events or an error.
// If an error is returned, the subscription will not start.
type OnSubscriptionStartFn func(ctx *resolve.Context) ([][]byte, error)
Comment thread
alepane21 marked this conversation as resolved.
Outdated

type SubscriptionConfiguration struct {
URL string
Header http.Header
Expand All @@ -119,6 +125,7 @@ type SubscriptionConfiguration struct {
// these headers by itself.
ForwardedClientHeaderRegularExpressions []RegularExpression
WsSubProtocol string
OnSubscriptionStartFns []OnSubscriptionStartFn
}

type FetchConfiguration struct {
Expand Down
24 changes: 22 additions & 2 deletions v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
return plan.SubscriptionConfiguration{
Input: string(input),
DataSource: &SubscriptionSource{
client: p.subscriptionClient,
client: p.subscriptionClient,
onSubscriptionStartFns: p.config.subscription.OnSubscriptionStartFns,
},
Variables: p.variables,
PostProcessing: DefaultPostProcessingConfiguration,
Expand Down Expand Up @@ -1932,7 +1933,8 @@ type RegularExpression struct {
}

type SubscriptionSource struct {
client GraphQLSubscriptionClient
client GraphQLSubscriptionClient
onSubscriptionStartFns []OnSubscriptionStartFn
}

func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
Expand All @@ -1944,6 +1946,15 @@ func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input [
if options.Body.Query == "" {
return resolve.ErrUnableToResolve
}
for _, fn := range s.onSubscriptionStartFns {
events, err := fn(ctx)
if err != nil {
return err
}
for _, event := range events {
updater.Update(event)
Comment thread
jensneuse marked this conversation as resolved.
Outdated
}
}
return s.client.SubscribeAsync(ctx, id, options, updater)
}

Expand All @@ -1963,6 +1974,15 @@ func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, updater r
if options.Body.Query == "" {
return resolve.ErrUnableToResolve
}
for _, fn := range s.onSubscriptionStartFns {
Comment thread
jensneuse marked this conversation as resolved.
Outdated
events, err := fn(ctx)
if err != nil {
return err
}
for _, event := range events {
updater.Update(event)
}
}
return s.client.Subscribe(ctx, options, updater)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4019,7 +4019,7 @@ func TestGraphQLDataSource(t *testing.T) {
Trigger: resolve.GraphQLSubscriptionTrigger{
Input: []byte(`{"url":"wss://swapi.com/graphql","body":{"query":"subscription{remainingJedis}"}}`),
Source: &SubscriptionSource{
NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
PostProcessing: DefaultPostProcessingConfiguration,
},
Expand Down Expand Up @@ -8902,6 +8902,174 @@ func TestSanitizeKey(t *testing.T) {
}
}

func TestSubscriptionSource_OnSubscriptionStartFns(t *testing.T) {
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
defer chatServer.Close()

sendChatMessage := func(t *testing.T, username, message string) {
time.Sleep(200 * time.Millisecond)
httpClient := http.Client{}
req, err := http.NewRequest(
http.MethodPost,
chatServer.URL,
bytes.NewBufferString(fmt.Sprintf(`{"variables": {}, "operationName": "SendMessage", "query": "mutation SendMessage { post(roomName: \"#test\", username: \"%s\", text: \"%s\") { id } }"}`, username, message)),
)
require.NoError(t, err)

req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}

chatServerSubscriptionOptions := func(t *testing.T, body string) []byte {
var gqlBody GraphQLBody
_ = json.Unmarshal([]byte(body), &gqlBody)
options := GraphQLSubscriptionOptions{
URL: chatServer.URL,
Body: gqlBody,
Header: nil,
}

optionsBytes, err := json.Marshal(options)
require.NoError(t, err)

return optionsBytes
}

newSubscriptionSource := func(ctx context.Context) SubscriptionSource {
httpClient := http.Client{}
subscriptionSource := SubscriptionSource{client: NewGraphQLSubscriptionClient(&httpClient, http.DefaultClient, ctx)}
return subscriptionSource
}

t.Run("should call the onSubscriptionStartFns at subscription source Start", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

updater := &testSubscriptionUpdater{}

startFnCalled := make(chan *resolve.Context, 1)
Comment thread
alepane21 marked this conversation as resolved.
Outdated
source := newSubscriptionSource(ctx.Context())
source.onSubscriptionStartFns = []OnSubscriptionStartFn{
func(ctx *resolve.Context) ([][]byte, error) {
startFnCalled <- ctx
return nil, nil
},
}
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)

err := source.Start(ctx, chatSubscriptionOptions, updater)
require.NoError(t, err)

username := "myuser"
message := "hello world!"
go sendChatMessage(t, username, message)
updater.AwaitUpdates(t, time.Second, 1)
assert.Len(t, updater.updates, 1)
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])

select {
case <-startFnCalled:
case <-time.After(time.Second):
t.Fatalf("start fn not called")
}
})

t.Run("should call the onSubscriptionStartFns at subscription source AsyncStart", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

updater := &testSubscriptionUpdater{}

startFnCalled := make(chan *resolve.Context, 1)
source := newSubscriptionSource(ctx.Context())
source.onSubscriptionStartFns = []OnSubscriptionStartFn{
func(ctx *resolve.Context) ([][]byte, error) {
startFnCalled <- ctx
return nil, nil
},
}
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)

err := source.AsyncStart(ctx, 0, chatSubscriptionOptions, updater)
require.NoError(t, err)

username := "myuser"
message := "hello world!"
go sendChatMessage(t, username, message)
updater.AwaitUpdates(t, time.Second, 1)
assert.Len(t, updater.updates, 1)
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])

select {
case <-startFnCalled:
case <-time.After(time.Second):
t.Fatalf("start fn not called")
}
})

t.Run("if the onSubscriptionStartFns returns an error, the subscription should not async start and return the error", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

updater := &testSubscriptionUpdater{}

source := newSubscriptionSource(ctx.Context())
source.onSubscriptionStartFns = []OnSubscriptionStartFn{
func(ctx *resolve.Context) ([][]byte, error) {
return nil, errors.New("test error")
},
}
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)

err := source.AsyncStart(ctx, 0, chatSubscriptionOptions, updater)
require.Error(t, err)
require.ErrorContains(t, err, "test error")
assert.Len(t, updater.updates, 0)
})

t.Run("if the onSubscriptionStartFns returns an error, the subscription should not start and return the error", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

updater := &testSubscriptionUpdater{}

source := newSubscriptionSource(ctx.Context())
source.onSubscriptionStartFns = []OnSubscriptionStartFn{
func(ctx *resolve.Context) ([][]byte, error) {
return nil, errors.New("test error")
},
}
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)

err := source.Start(ctx, chatSubscriptionOptions, updater)
require.Error(t, err)
require.ErrorContains(t, err, "test error")
assert.Len(t, updater.updates, 0)
})

t.Run("if the onSubscriptionStartFns returns a message, the message should be sent to the client", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

updater := &testSubscriptionUpdater{}

source := newSubscriptionSource(ctx.Context())
source.onSubscriptionStartFns = []OnSubscriptionStartFn{
func(ctx *resolve.Context) ([][]byte, error) {
return [][]byte{[]byte(`{"data":{"messageAdded":{"text":"hello world!","createdBy":"injection"}}}`)}, nil
Comment thread
jensneuse marked this conversation as resolved.
Outdated
},
}
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)

err := source.AsyncStart(ctx, 0, chatSubscriptionOptions, updater)
require.NoError(t, err)
assert.Len(t, updater.updates, 1)
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"injection"}}}`, updater.updates[0])
})
}

const interfaceSelectionSchema = `

scalar String
Expand Down
Loading