Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
38f8826
feat: add subscriptions hook
alepane21 Jul 18, 2025
824362b
chore: add tests of OnSubscriptionStartFn
alepane21 Jul 19, 2025
cdfa9bf
chore: remove duplicated test
alepane21 Jul 19, 2025
f8de362
chore: add test of error with Start method
alepane21 Jul 19, 2025
e500d1c
chore: document OnSubscriptionStartFn
alepane21 Jul 19, 2025
c970c07
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 19, 2025
2a6b5f0
chore: refactores the OnSubscriptionStart hook to be called from the …
alepane21 Jul 22, 2025
1e6ed5f
fix: behaviour when errors happend have been changed by mistake
alepane21 Jul 22, 2025
d776943
chore: change OnSubscriptionStart hook contract
alepane21 Jul 22, 2025
2f34327
chore: hooks now can decide if the subscription has to end
alepane21 Jul 22, 2025
dc89310
chore: events are written directly on the resolveCtx
alepane21 Jul 23, 2025
3cd5690
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 23, 2025
bfad223
chore: improve test
alepane21 Jul 23, 2025
91dbadd
chore: improve names to align with the ones in the ADR of the router
alepane21 Jul 23, 2025
207adf7
chore: execute hooks inside worker, so that we don't block the main l…
alepane21 Jul 23, 2025
3257c52
fix: if only one event was to be skipped, every event was skipped! re…
alepane21 Jul 23, 2025
ec79b06
chore: remove now useless handleTriggerUpdateSubscription
alepane21 Jul 23, 2025
e74d079
chore: send updates on subscription on time (there is still a data ra…
alepane21 Jul 24, 2025
41b92a0
chore: avoid workChan races
alepane21 Jul 24, 2025
7bb7431
chore: lint issue
alepane21 Jul 24, 2025
e0845be
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 29, 2025
3b61ef2
chore: rename SubscriptionOnStartFns to StartupHooks for clarity
alepane21 Jul 29, 2025
ae55c92
Merge remote-tracking branch 'origin/master' into ale/eng-7600-add-su…
alepane21 Jul 29, 2025
87cb3a8
Merge remote-tracking branch 'origin/ale/eng-7600-add-subscriptionons…
alepane21 Jul 29, 2025
ea8ab75
chore: wait for the initial hook execution before starting the dataso…
alepane21 Aug 5, 2025
2ed6134
fix: avoid locking worker goroutine if the trigger was already set up
alepane21 Aug 6, 2025
5fa5a76
chore: add test to verify Resolve beahviour with more than one subscr…
alepane21 Aug 6, 2025
8731ff2
chore: remove return close option
alepane21 Aug 19, 2025
6793f09
chore: HookableSubscriptionDataSource should also be without close
alepane21 Aug 19, 2025
fdf829a
chore: fix initialHooksClose value
alepane21 Aug 19, 2025
7b569b6
chore: simplify hooks implementation
alepane21 Aug 20, 2025
4a94540
chore: remove pinned subscription
alepane21 Aug 20, 2025
a673838
chore: remove unused code, fix naming
alepane21 Aug 20, 2025
46d97b2
chore: fix test
alepane21 Aug 20, 2025
447c103
chore: avoid data source start if the subscription should close
alepane21 Aug 20, 2025
7bb99a5
chore: improve resolve structure
alepane21 Aug 21, 2025
f8c9d06
chore: type
alepane21 Aug 21, 2025
baec1da
chore: readd pinned updater to sub
alepane21 Aug 21, 2025
2100f14
chore: simplify events flow
alepane21 Aug 22, 2025
311b021
chore: fix compile issues
StarpTech Aug 23, 2025
e0af0a0
chore: fix tests
alepane21 Aug 25, 2025
f4eba63
chore: remove pubsub datasource implementations and related tests, it…
alepane21 Aug 25, 2025
0efce3c
test: add new test for handling multiple subscriptions with the same …
alepane21 Aug 25, 2025
6234791
chore: execute startup hooks inside worker go routine
alepane21 Sep 3, 2025
8df1b67
chore: remove parameter not used anymore
alepane21 Sep 3, 2025
a0024f0
chore: add subscription to registry after startup hooks execution
alepane21 Sep 9, 2025
ddc652f
Merge branch 'topic/streams-v1' into ale/eng-7600-add-subscriptionons…
dkorittki Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions v2/pkg/engine/datasource/graphql_datasource/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/federation"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
)
Expand Down Expand Up @@ -103,6 +104,13 @@ type SingleTypeField struct {
FieldName string
}

// OnSubscriptionStartFn defines a hook function that is called when a subscription starts.
// It receives the resolve context and the input of the subscription.
// The function can return a boolean indicating if the subscription should be closed, and an error.
// The error is propagated to the client.
// If close is true, the subscription is closed.
type OnSubscriptionStartFn func(ctx *resolve.Context, input []byte) (close bool, err error)

type SubscriptionConfiguration struct {
URL string
Header http.Header
Expand All @@ -119,6 +127,7 @@ type SubscriptionConfiguration struct {
// these headers by itself.
ForwardedClientHeaderRegularExpressions []RegularExpression
WsSubProtocol string
OnSubscriptionStartFns []OnSubscriptionStartFn
}

type FetchConfiguration struct {
Expand Down
16 changes: 14 additions & 2 deletions v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
return plan.SubscriptionConfiguration{
Input: string(input),
DataSource: &SubscriptionSource{
client: p.subscriptionClient,
client: p.subscriptionClient,
onSubscriptionStartFns: p.config.subscription.OnSubscriptionStartFns,
},
Variables: p.variables,
PostProcessing: DefaultPostProcessingConfiguration,
Expand Down Expand Up @@ -1939,7 +1940,8 @@ type RegularExpression struct {
}

type SubscriptionSource struct {
client GraphQLSubscriptionClient
client GraphQLSubscriptionClient
onSubscriptionStartFns []OnSubscriptionStartFn
}

func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
Expand Down Expand Up @@ -1989,3 +1991,13 @@ func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte,
}
return s.client.UniqueRequestID(ctx, options, xxh)
}

func (s *SubscriptionSource) OnSubscriptionStart(ctx *resolve.Context, input []byte) (close bool, err error) {
for _, fn := range s.onSubscriptionStartFns {
close, err = fn(ctx, input)
Comment thread
alepane21 marked this conversation as resolved.
Outdated
if err != nil || close {
return close, err
}
}
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -4019,7 +4019,7 @@ func TestGraphQLDataSource(t *testing.T) {
Trigger: resolve.GraphQLSubscriptionTrigger{
Input: []byte(`{"url":"wss://swapi.com/graphql","body":{"query":"subscription{remainingJedis}"}}`),
Source: &SubscriptionSource{
NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
PostProcessing: DefaultPostProcessingConfiguration,
},
Expand Down Expand Up @@ -8902,6 +8902,56 @@ func TestSanitizeKey(t *testing.T) {
}
}

func TestSubscriptionSource_OnSubscriptionStart(t *testing.T) {

t.Run("OnSubscriptionStart calls onSubscriptionStartFns", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

startFnCalled := make(chan struct {
ctx *resolve.Context
input []byte
}, 1)
subscriptionSource := SubscriptionSource{
onSubscriptionStartFns: []OnSubscriptionStartFn{
func(ctx *resolve.Context, input []byte) (bool, error) {
startFnCalled <- struct {
ctx *resolve.Context
input []byte
}{ctx, input}
return false, nil
},
},
}

close, err := subscriptionSource.OnSubscriptionStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
require.NoError(t, err)
assert.False(t, close)
require.Len(t, startFnCalled, 1)
called := <-startFnCalled
assert.Equal(t, ctx, called.ctx)
assert.Equal(t, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`), called.input)
})

t.Run("OnSubscriptionStart calls onSubscriptionStartFns and returns error if one of the functions returns an error", func(t *testing.T) {
ctx := resolve.NewContext(context.Background())
defer ctx.Context().Done()

subscriptionSource := SubscriptionSource{
onSubscriptionStartFns: []OnSubscriptionStartFn{
func(ctx *resolve.Context, input []byte) (bool, error) {
return false, errors.New("test error")
},
},
}

close, err := subscriptionSource.OnSubscriptionStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
assert.False(t, close)
require.Error(t, err)
assert.ErrorContains(t, err, "test error")
})
}

const interfaceSelectionSchema = `

scalar String
Expand Down
18 changes: 18 additions & 0 deletions v2/pkg/engine/resolve/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Context struct {
fieldRenderer FieldValueRenderer

subgraphErrors error

subscriptionUpdater func(data []byte)
}

type ExecutionOptions struct {
Expand Down Expand Up @@ -104,6 +106,22 @@ func (c *Context) SetEngineLoaderHooks(hooks LoaderHooks) {
c.LoaderHooks = hooks
}

// SetSubscriptionUpdater add a function that will be called when EmitSubscriptionUpdate is called
// usually it is set by the resolver when a subscription is started, but it can be set to nil to disable the feature
// or set to a different function to extend the behaviour or make it easy to test the subscription update logic
func (c *Context) SetSubscriptionUpdater(fn func(data []byte)) {
c.subscriptionUpdater = fn
}

// EmitSubscriptionUpdate emits a subscription update to the client
// if the emitSubscriptionUpdate function is not set, the update is not sent to the client
// this is used to allow external code to emit updates on this subscription
func (c *Context) EmitSubscriptionUpdate(data []byte) {
if c.subscriptionUpdater != nil {
c.subscriptionUpdater(data)
}
}

type RateLimitOptions struct {
// Enable switches rate limiting on or off
Enable bool
Expand Down
13 changes: 13 additions & 0 deletions v2/pkg/engine/resolve/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,16 @@ type AsyncSubscriptionDataSource interface {
AsyncStop(id uint64)
UniqueRequestID(ctx *Context, input []byte, xxh *xxhash.Digest) (err error)
}

// SubscriptionDataSourceHookable is a hookable interface for subscription data sources.
// It is used to call a function when a subscription is started.
// This is useful for data sources that need to do some work when a subscription is started,
// e.g. to establish a connection to the data source or to emit updates to the client.
// The function is called with the context and the input of the subscription.
// The function is called before the subscription is started and can be used to emit updates to the client.
type SubscriptionDataSourceHookable interface {
// OnSubscriptionStart is called when a new subscription is created
// If close is true, the subscription is closed.
// If an error is returned, the error is propagated to the client.
OnSubscriptionStart(ctx *Context, input []byte) (close bool, err error)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface doesn't reflect the ADR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the name to be better aligned with the interface described in the ADR. Still it will not be like what the ADR described, because this is the interface on a datasource, but the ADR describe the custom module interface that is in the router (wundergraph/cosmo#2059, still working on it)

}
87 changes: 61 additions & 26 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,16 @@ func (r *Resolver) handleTriggerComplete(triggerID uint64) {
r.completeTrigger(triggerID)
}

// callSubscriptionDataSourceStartHook is used to call the OnSubscriptionStart method of the subscription data source
// if the subscription data source implements the SubscriptionDataSourceHook interface.
// This is used to allow external code to emit updates on this subscription.
func callSubscriptionDataSourceStartHook(ctx *Context, source SubscriptionDataSource, input []byte) (close bool, err error) {
if hook, ok := source.(SubscriptionDataSourceHookable); ok {
return hook.OnSubscriptionStart(ctx, input)
}
return false, nil
}

func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) {
var (
err error
Expand All @@ -590,6 +600,12 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
s.heartbeat = true
}

// Set the subscription updater to the resolver to allow external code to emit updates
// on this subscription
add.ctx.SetSubscriptionUpdater(func(data []byte) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right. Events must be emitted through the events channel. This can cause cause issues because the writer is not synchronised.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so now this has been changed. I'm running the hooks inside the worker to avoid slowing the main loop to much. The updates are then sent directly using the executeSubscriptionUpdate, without using the events (that would have been complex, because events are going to be sent to all the subscriptions of a trigger, but I want them to be sent only to the current subscription) or even the workChan, to avoid to block the subscription worker if a modules return more messages than the buffer size.

r.handleTriggerUpdateSubscription(add.ctx, s, data)
})

// Start the dedicated worker goroutine where the subscription updates are processed
// and writes are written to the client in a single threaded manner
go s.startWorker()
Expand All @@ -603,6 +619,13 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:added:%d:%d\n", triggerID, add.id.SubscriptionID)
}
close, errStartHook := callSubscriptionDataSourceStartHook(add.ctx, add.resolve.Trigger.Source, add.input)
if errStartHook != nil {
r.asyncErrorWriter.WriteError(add.ctx, errStartHook, add.resolve.Response, add.writer)
}
if close {
r.closeTrigger(triggerID, SubscriptionCloseKindNormal)
}
return
}

Expand Down Expand Up @@ -639,6 +662,15 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
asyncDataSource = async
}

close, errStartHook := callSubscriptionDataSourceStartHook(add.ctx, add.resolve.Trigger.Source, add.input)
if errStartHook != nil {
r.asyncErrorWriter.WriteError(add.ctx, errStartHook, add.resolve.Response, add.writer)
}
if close {
r.closeTrigger(triggerID, SubscriptionCloseKindNormal)
return
}

go func() {
if r.options.Debug {
fmt.Printf("resolver:trigger:start:%d\n", triggerID)
Expand Down Expand Up @@ -757,6 +789,34 @@ func (r *Resolver) handleRemoveClient(id int64) {
}
}

func (r *Resolver) handleTriggerUpdateSubscription(ctx *Context, sub *sub, data []byte) {
if err := ctx.ctx.Err(); err != nil {
return // no need to schedule an event update when the client already disconnected
}
skip, err := sub.resolve.Filter.SkipEvent(ctx, data, r.triggerUpdateBuf)
if err != nil {
r.asyncErrorWriter.WriteError(ctx, err, sub.resolve.Response, sub.writer)
return
}
if skip {
return
}

fn := func() {
r.executeSubscriptionUpdate(ctx, sub, data)
}

select {
case <-r.ctx.Done():
// Skip sending all events if the resolver is shutting down
return
case <-ctx.ctx.Done():
// Skip sending the event if the client disconnected
case sub.workChan <- workItem{fn, false}:
// Send the event to the subscription worker
}
}

func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
trig, ok := r.triggers[id]
if !ok {
Expand All @@ -767,32 +827,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
}

for c, s := range trig.subscriptions {
Comment thread
alepane21 marked this conversation as resolved.
c, s := c, s
if err := c.ctx.Err(); err != nil {
continue // no need to schedule an event update when the client already disconnected
}
skip, err := s.resolve.Filter.SkipEvent(c, data, r.triggerUpdateBuf)
if err != nil {
r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer)
continue
}
if skip {
continue
}

fn := func() {
r.executeSubscriptionUpdate(c, s, data)
}

select {
case <-r.ctx.Done():
// Skip sending all events if the resolver is shutting down
return
case <-c.ctx.Done():
// Skip sending the event if the client disconnected
case s.workChan <- workItem{fn, false}:
// Send the event to the subscription worker
}
r.handleTriggerUpdateSubscription(c, s, data)
}
}

Expand Down
Loading
Loading