diff --git a/router/pkg/pubsub/datasource/pubsubprovider.go b/router/pkg/pubsub/datasource/pubsubprovider.go index 3697229182..e20f1ace2b 100644 --- a/router/pkg/pubsub/datasource/pubsubprovider.go +++ b/router/pkg/pubsub/datasource/pubsubprovider.go @@ -3,6 +3,7 @@ package datasource import ( "context" "fmt" + "slices" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -43,6 +44,10 @@ func (p *PubSubProvider) applyPublishEventHooks(ctx context.Context, cfg Publish for _, hook := range p.hooks.OnPublishEvents { var err error currentEvents, err = hook(ctx, cfg, currentEvents, p.eventBuilder) + currentEvents = slices.DeleteFunc(currentEvents, func(event StreamEvent) bool { + return event == nil + }) + if err != nil { p.Logger.Error( "error applying publish event hooks", diff --git a/router/pkg/pubsub/datasource/pubsubprovider_test.go b/router/pkg/pubsub/datasource/pubsubprovider_test.go index e623de93b0..0bf12e7f60 100644 --- a/router/pkg/pubsub/datasource/pubsubprovider_test.go +++ b/router/pkg/pubsub/datasource/pubsubprovider_test.go @@ -225,9 +225,17 @@ func TestProvider_Publish_WithHooks_Success(t *testing.T) { } originalEvents := []StreamEvent{ &testEvent{mutableTestEvent("original data")}, + &testEvent{mutableTestEvent("original data 2")}, } modifiedEvents := []StreamEvent{ &testEvent{mutableTestEvent("modified data")}, + nil, // should be ignored by publisher + &testEvent{mutableTestEvent("modified data 2")}, + nil, // should be ignored by publisher + } + expectedEvents := []StreamEvent{ + &testEvent{mutableTestEvent("modified data")}, + &testEvent{mutableTestEvent("modified data 2")}, } var eventBuilderExists bool @@ -240,7 +248,7 @@ func TestProvider_Publish_WithHooks_Success(t *testing.T) { return modifiedEvents, nil } - mockAdapter.On("Publish", mock.Anything, config, modifiedEvents).Return(nil) + mockAdapter.On("Publish", mock.Anything, config, expectedEvents).Return(nil) provider := PubSubProvider{ Adapter: mockAdapter, diff --git a/router/pkg/pubsub/datasource/subscription_event_updater.go b/router/pkg/pubsub/datasource/subscription_event_updater.go index 615354ba1a..5ed4a6c837 100644 --- a/router/pkg/pubsub/datasource/subscription_event_updater.go +++ b/router/pkg/pubsub/datasource/subscription_event_updater.go @@ -2,6 +2,7 @@ package datasource import ( "context" + "slices" "sync" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" @@ -86,6 +87,10 @@ func (s *subscriptionEventUpdater) updateSubscription(ctx context.Context, wg *s var err error for i := range hooks { events, err = hooks[i](ctx, s.subscriptionEventConfiguration, s.eventBuilder, events) + events = slices.DeleteFunc(events, func(event StreamEvent) bool { + return event == nil + }) + if err != nil { errCh <- err } diff --git a/router/pkg/pubsub/datasource/subscription_event_updater_test.go b/router/pkg/pubsub/datasource/subscription_event_updater_test.go index 2c0295dd1c..1b6c1bd3a7 100644 --- a/router/pkg/pubsub/datasource/subscription_event_updater_test.go +++ b/router/pkg/pubsub/datasource/subscription_event_updater_test.go @@ -79,6 +79,7 @@ func TestSubscriptionEventUpdater_UpdateSubscription_WithHooks_Success(t *testin } modifiedEvents := []StreamEvent{ &testEvent{mutableTestEvent("modified data")}, + nil, // this should simply be ignored } // Create wrapper function for the mock