diff --git a/router-tests/modules/stream_receive_test.go b/router-tests/modules/stream_receive_test.go index 23477f3925..325600915d 100644 --- a/router-tests/modules/stream_receive_test.go +++ b/router-tests/modules/stream_receive_test.go @@ -1,8 +1,8 @@ package module_test import ( + "encoding/json" "errors" - "fmt" "net/http" "sync/atomic" "testing" @@ -522,207 +522,6 @@ func TestReceiveHook(t *testing.T) { }) }) - t.Run("Test error deduplication with multiple subscriptions", func(t *testing.T) { - t.Parallel() - - cfg := config.Config{ - Graph: config.Graph{}, - Modules: map[string]interface{}{ - "streamReceiveModule": stream_receive.StreamReceiveModule{ - Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) { - return datasource.NewStreamEvents(nil), errors.New("deduplicated error") - }, - }, - }, - } - - testenv.Run(t, &testenv.Config{ - RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, - EnableKafka: true, - RouterOptions: []core.Option{ - core.WithModulesConfig(cfg.Modules), - core.WithCustomModules(&stream_receive.StreamReceiveModule{}), - }, - LogObservation: testenv.LogObservationConfig{ - Enabled: true, - LogLevel: zapcore.ErrorLevel, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - topics := []string{"employeeUpdated"} - events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...) - - var subscriptionOne struct { - employeeUpdatedMyKafka struct { - ID float64 `graphql:"id"` - Details struct { - Forename string `graphql:"forename"` - Surname string `graphql:"surname"` - } `graphql:"details"` - } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` - } - - surl := xEnv.GraphQLWebSocketSubscriptionURL() - - // Create 3 subscriptions that will all receive the same error - clients := make([]*graphql.SubscriptionClient, 3) - clientRunChs := make([]chan error, 3) - - for i := range 3 { - clients[i] = graphql.NewSubscriptionClient(surl) - clientRunChs[i] = make(chan error) - - subscriptionID, err := clients[i].Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { - return nil - }) - require.NoError(t, err) - require.NotEmpty(t, subscriptionID) - - go func() { - clientRunChs[i] <- clients[i].Run() - }() - } - - // Wait for all subscriptions to be established - xEnv.WaitForSubscriptionCount(3, Timeout) - - // Produce a message that will trigger the error in all handlers - events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) - - // Wait for all subscriptions to be closed due to the error - xEnv.WaitForSubscriptionCount(0, Timeout) - - // Verify all clients completed - for i := 0; i < 3; i++ { - testenv.AwaitChannelWithT(t, Timeout, clientRunChs[i], func(t *testing.T, err error) { - require.NoError(t, err) - }, "client should have completed when server closed connection") - } - - xEnv.WaitForTriggerCount(0, Timeout) - - // Verify error deduplication: should see only one error log entry - errorLogs := xEnv.Observer().FilterMessage("some handlers have thrown an error") - assert.Len(t, errorLogs.All(), 1, "should have exactly one deduplicated error log entry") - - // Verify the error log contains the correct error message and count - if len(errorLogs.All()) > 0 { - logEntry := errorLogs.All()[0] - fields := logEntry.ContextMap() - - assert.Equal(t, "deduplicated error", fields["error"], "error message should match") - assert.Equal(t, int64(3), fields["amount_handlers"], "should count all 3 handlers that threw the error") - } - }) - }) - - t.Run("Test unique error messages are all logged", func(t *testing.T) { - t.Parallel() - - var errorCounter atomic.Int32 - - cfg := config.Config{ - Graph: config.Graph{}, - Modules: map[string]interface{}{ - "streamReceiveModule": stream_receive.StreamReceiveModule{ - Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) { - count := errorCounter.Add(1) - return datasource.NewStreamEvents(nil), fmt.Errorf("unique error %d", count) - }, - }, - }, - } - - testenv.Run(t, &testenv.Config{ - RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, - EnableKafka: true, - RouterOptions: []core.Option{ - core.WithModulesConfig(cfg.Modules), - core.WithCustomModules(&stream_receive.StreamReceiveModule{}), - }, - LogObservation: testenv.LogObservationConfig{ - Enabled: true, - LogLevel: zapcore.ErrorLevel, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - topics := []string{"employeeUpdated"} - events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...) - - var subscriptionOne struct { - employeeUpdatedMyKafka struct { - ID float64 `graphql:"id"` - Details struct { - Forename string `graphql:"forename"` - Surname string `graphql:"surname"` - } `graphql:"details"` - } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` - } - - surl := xEnv.GraphQLWebSocketSubscriptionURL() - - // Create 3 subscriptions that will each receive a unique error - clients := make([]*graphql.SubscriptionClient, 3) - clientRunChs := make([]chan error, 3) - - for i := range 3 { - clients[i] = graphql.NewSubscriptionClient(surl) - clientRunChs[i] = make(chan error) - - subscriptionID, err := clients[i].Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { - return nil - }) - require.NoError(t, err) - require.NotEmpty(t, subscriptionID) - - go func() { - clientRunChs[i] <- clients[i].Run() - }() - } - - // Wait for all subscriptions to be established - xEnv.WaitForSubscriptionCount(3, Timeout) - - // Produce a message that will trigger a unique error in each handler - events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) - - // Wait for all subscriptions to be closed due to the error - xEnv.WaitForSubscriptionCount(0, Timeout) - - // Verify all clients completed - for i := range 3 { - testenv.AwaitChannelWithT(t, Timeout, clientRunChs[i], func(t *testing.T, err error) { - require.NoError(t, err) - }, "client should have completed when server closed connection") - } - - xEnv.WaitForTriggerCount(0, Timeout) - - // Verify no deduplication: should see three error log entries (one for each unique error) - errorLogs := xEnv.Observer().FilterMessage("some handlers have thrown an error") - assert.Len(t, errorLogs.All(), 3, "should have three separate error log entries for unique errors") - - // Verify each error log contains a unique error message and count of 1 - if len(errorLogs.All()) == 3 { - var errorMessages []string - for _, logEntry := range errorLogs.All() { - fields := logEntry.ContextMap() - errorMsg, ok := fields["error"].(string) - require.True(t, ok, "error field should be a string") - - // Check that error message is unique (starts with "unique error") - assert.Contains(t, errorMsg, "unique error", "error message should contain 'unique error'") - assert.NotContains(t, errorMessages, errorMsg, "each error message should be unique") - errorMessages = append(errorMessages, errorMsg) - - // Each unique error should have been thrown by exactly 1 handler - assert.Equal(t, int64(1), fields["amount_handlers"], "each unique error should have amount_handlers = 1") - } - - // Verify we got exactly 3 unique error messages - assert.Len(t, errorMessages, 3, "should have exactly 3 unique error messages") - } - }) - }) - t.Run("Test concurrent handler execution works", func(t *testing.T) { t.Parallel() @@ -813,7 +612,9 @@ func TestReceiveHook(t *testing.T) { core.WithModulesConfig(cfg.Modules), core.WithCustomModules(&stream_receive.StreamReceiveModule{}), core.WithSubscriptionHooks(config.SubscriptionHooksConfiguration{ - MaxConcurrentEventReceiveHandlers: tc.maxConcurrent, + OnReceiveEvents: config.OnReceiveEventsConfiguration{ + MaxConcurrentHandlers: tc.maxConcurrent, + }, }), }, LogObservation: testenv.LogObservationConfig{ @@ -893,4 +694,127 @@ func TestReceiveHook(t *testing.T) { }) } }) + + t.Run("Test timeout mechanism allows out-of-order event delivery", func(t *testing.T) { + t.Parallel() + + // One subscriber receives three consecutive events. + // The first event's hook is delayed, exceeding the timeout. + // The second and third events' hooks process immediately without delay. + // Because the first hook exceeds the timeout, the system abandons waiting for it + // and processes the second and third events. + // The first event will be delivered later when its hook finally completes. + // This should result in event order [2, 3, 1] at the client. + + hookDelay := 500 * time.Millisecond + hookTimeout := 100 * time.Millisecond + + var callCount atomic.Int32 + + cfg := config.Config{ + Graph: config.Graph{}, + Modules: map[string]interface{}{ + "streamReceiveModule": stream_receive.StreamReceiveModule{ + Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) { + // Only the first call should delay + if callCount.Add(1) == 1 { + time.Sleep(hookDelay) + } + return events, nil + }, + }, + }, + } + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + RouterOptions: []core.Option{ + core.WithModulesConfig(cfg.Modules), + core.WithCustomModules(&stream_receive.StreamReceiveModule{}), + core.WithSubscriptionHooks(config.SubscriptionHooksConfiguration{ + OnReceiveEvents: config.OnReceiveEventsConfiguration{ + MaxConcurrentHandlers: 3, + HandlerTimeout: hookTimeout, + }, + }), + }, + LogObservation: testenv.LogObservationConfig{ + Enabled: true, + LogLevel: zapcore.InfoLevel, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + topics := []string{"employeeUpdated"} + events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...) + + var subscriptionOne struct { + employeeUpdatedMyKafka struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` + } + + surl := xEnv.GraphQLWebSocketSubscriptionURL() + client := graphql.NewSubscriptionClient(surl) + + subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 3) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- kafkaSubscriptionArgs{ + dataValue: dataValue, + errValue: errValue, + } + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + + clientRunCh := make(chan error) + go func() { + clientRunCh <- client.Run() + }() + + xEnv.WaitForSubscriptionCount(1, Timeout) + + events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"first"}}`) + events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 2,"update":{"name":"second"}}`) + events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 3,"update":{"name":"third"}}`) + + // Collect all 3 events + receivedIDs := make([]float64, 0, 3) + for i := 0; i < 3; i++ { + testenv.AwaitChannelWithT(t, Timeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { + require.NoError(t, args.errValue) + + var response struct { + EmployeeUpdatedMyKafka struct { + ID float64 `json:"id"` + } `json:"employeeUpdatedMyKafka"` + } + err := json.Unmarshal(args.dataValue, &response) + require.NoError(t, err) + receivedIDs = append(receivedIDs, response.EmployeeUpdatedMyKafka.ID) + }) + } + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, Timeout, clientRunCh, func(t *testing.T, err error) { + require.NoError(t, err) + }, "unable to close client before timeout") + + // Verify events arrived out of order: event 1 should be the last one to arrive + assert.ElementsMatch(t, []float64{1, 2, 3}, receivedIDs, "expected to receive all events") + assert.Equal(t, float64(1), receivedIDs[len(receivedIDs)-1], "expected the delayed event to arrive last") + assert.NotEqual(t, float64(1), receivedIDs[0], "expected at least one later event to arrive before the delayed one") + + timeoutLog := xEnv.Observer().FilterMessage("Timeout exceeded during subscription updates, events may arrive out of order") + assert.Len(t, timeoutLog.All(), 1, "expected timeout warning to be logged") + + // Verify all hooks were executed + hookLog := xEnv.Observer().FilterMessage("Stream Hook has been run") + assert.Len(t, hookLog.All(), 3) + }) + }) } diff --git a/router/core/factoryresolver.go b/router/core/factoryresolver.go index 36ea68d8e4..d8cfecc283 100644 --- a/router/core/factoryresolver.go +++ b/router/core/factoryresolver.go @@ -418,8 +418,8 @@ func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nod } } - subscriptionOnStartFns := make([]graphql_datasource.SubscriptionOnStartFn, len(l.subscriptionHooks.onStart)) - for i, fn := range l.subscriptionHooks.onStart { + subscriptionOnStartFns := make([]graphql_datasource.SubscriptionOnStartFn, len(l.subscriptionHooks.onStart.handlers)) + for i, fn := range l.subscriptionHooks.onStart.handlers { subscriptionOnStartFns[i] = NewEngineSubscriptionOnStartHook(fn) } customConfiguration, err := graphql_datasource.NewConfiguration(graphql_datasource.ConfigurationInput{ @@ -477,18 +477,18 @@ func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nod } } - subscriptionOnStartFns := make([]pubsub_datasource.SubscriptionOnStartFn, len(l.subscriptionHooks.onStart)) - for i, fn := range l.subscriptionHooks.onStart { + subscriptionOnStartFns := make([]pubsub_datasource.SubscriptionOnStartFn, len(l.subscriptionHooks.onStart.handlers)) + for i, fn := range l.subscriptionHooks.onStart.handlers { subscriptionOnStartFns[i] = NewPubSubSubscriptionOnStartHook(fn) } - onPublishEventsFns := make([]pubsub_datasource.OnPublishEventsFn, len(l.subscriptionHooks.onPublishEvents)) - for i, fn := range l.subscriptionHooks.onPublishEvents { + onPublishEventsFns := make([]pubsub_datasource.OnPublishEventsFn, len(l.subscriptionHooks.onPublishEvents.handlers)) + for i, fn := range l.subscriptionHooks.onPublishEvents.handlers { onPublishEventsFns[i] = NewPubSubOnPublishEventsHook(fn) } - onReceiveEventsFns := make([]pubsub_datasource.OnReceiveEventsFn, len(l.subscriptionHooks.onReceiveEvents)) - for i, fn := range l.subscriptionHooks.onReceiveEvents { + onReceiveEventsFns := make([]pubsub_datasource.OnReceiveEventsFn, len(l.subscriptionHooks.onReceiveEvents.handlers)) + for i, fn := range l.subscriptionHooks.onReceiveEvents.handlers { onReceiveEventsFns[i] = NewPubSubOnReceiveEventsHook(fn) } @@ -501,10 +501,17 @@ func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nod l.resolver.InstanceData().HostName, l.resolver.InstanceData().ListenAddress, pubsub_datasource.Hooks{ - SubscriptionOnStart: subscriptionOnStartFns, - OnReceiveEvents: onReceiveEventsFns, - OnPublishEvents: onPublishEventsFns, - MaxConcurrentOnReceiveHandlers: l.subscriptionHooks.maxConcurrentOnReceiveHooks, + SubscriptionOnStart: pubsub_datasource.SubscriptionOnStartHooks{ + Handlers: subscriptionOnStartFns, + }, + OnPublishEvents: pubsub_datasource.OnPublishEventsHooks{ + Handlers: onPublishEventsFns, + }, + OnReceiveEvents: pubsub_datasource.OnReceiveEventsHooks{ + Handlers: onReceiveEventsFns, + MaxConcurrentHandlers: l.subscriptionHooks.onReceiveEvents.maxConcurrentHandlers, + Timeout: l.subscriptionHooks.onReceiveEvents.timeout, + }, }, ) if err != nil { diff --git a/router/core/router.go b/router/core/router.go index 8ebac78a5b..20b0b33ae6 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -253,9 +253,12 @@ func NewRouter(opts ...Option) (*Router, error) { r.metricConfig = rmetric.DefaultConfig(Version) } - // Default value for maxConcurrentOnReceiveHooks - if r.subscriptionHooks.maxConcurrentOnReceiveHooks == 0 { - r.subscriptionHooks.maxConcurrentOnReceiveHooks = 100 + if r.subscriptionHooks.onReceiveEvents.maxConcurrentHandlers == 0 { + r.subscriptionHooks.onReceiveEvents.maxConcurrentHandlers = 100 + } + + if r.subscriptionHooks.onReceiveEvents.timeout == 0 { + r.subscriptionHooks.onReceiveEvents.timeout = 5 * time.Second } if r.corsOptions == nil { @@ -681,15 +684,15 @@ func (r *Router) initModules(ctx context.Context) error { } if handler, ok := moduleInstance.(SubscriptionOnStartHandler); ok { - r.subscriptionHooks.onStart = append(r.subscriptionHooks.onStart, handler.SubscriptionOnStart) + r.subscriptionHooks.onStart.handlers = append(r.subscriptionHooks.onStart.handlers, handler.SubscriptionOnStart) } if handler, ok := moduleInstance.(StreamPublishEventHandler); ok { - r.subscriptionHooks.onPublishEvents = append(r.subscriptionHooks.onPublishEvents, handler.OnPublishEvents) + r.subscriptionHooks.onPublishEvents.handlers = append(r.subscriptionHooks.onPublishEvents.handlers, handler.OnPublishEvents) } if handler, ok := moduleInstance.(StreamReceiveEventHandler); ok { - r.subscriptionHooks.onReceiveEvents = append(r.subscriptionHooks.onReceiveEvents, handler.OnReceiveEvents) + r.subscriptionHooks.onReceiveEvents.handlers = append(r.subscriptionHooks.onReceiveEvents.handlers, handler.OnReceiveEvents) } r.modules = append(r.modules, moduleInstance) @@ -2139,7 +2142,8 @@ func WithDemoMode(demoMode bool) Option { func WithSubscriptionHooks(cfg config.SubscriptionHooksConfiguration) Option { return func(r *Router) { - r.subscriptionHooks.maxConcurrentOnReceiveHooks = cfg.MaxConcurrentEventReceiveHandlers + r.subscriptionHooks.onReceiveEvents.maxConcurrentHandlers = cfg.OnReceiveEvents.MaxConcurrentHandlers + r.subscriptionHooks.onReceiveEvents.timeout = cfg.OnReceiveEvents.HandlerTimeout } } diff --git a/router/core/router_config.go b/router/core/router_config.go index b0027b3e8a..8616946eac 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -27,10 +27,23 @@ import ( ) type subscriptionHooks struct { - onStart []func(ctx SubscriptionOnStartHandlerContext) error - onPublishEvents []func(ctx StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) - onReceiveEvents []func(ctx StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) - maxConcurrentOnReceiveHooks int + onStart onStartHooks + onPublishEvents onPublishEventsHooks + onReceiveEvents onReceiveEventsHooks +} + +type onStartHooks struct { + handlers []func(ctx SubscriptionOnStartHandlerContext) error +} + +type onPublishEventsHooks struct { + handlers []func(ctx StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) +} + +type onReceiveEventsHooks struct { + handlers []func(ctx StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) + maxConcurrentHandlers int + timeout time.Duration } type Config struct { diff --git a/router/core/subscriptions_modules.go b/router/core/subscriptions_modules.go index bcfaaae114..90ca6cf636 100644 --- a/router/core/subscriptions_modules.go +++ b/router/core/subscriptions_modules.go @@ -191,9 +191,22 @@ func NewPubSubSubscriptionOnStartHook(fn func(ctx SubscriptionOnStartHandlerCont return func(resolveCtx resolve.StartupHookContext, subConf datasource.SubscriptionEventConfiguration, eventBuilder datasource.EventBuilderFn) error { requestContext := getRequestContext(resolveCtx.Context) + + logger := requestContext.Logger() + if logger != nil { + logger = logger.With(zap.String("component", "pubsub_subscription_on_start_hook")) + if subConf != nil { + logger = logger.With( + zap.String("provider_id", subConf.ProviderID()), + zap.String("provider_type", string(subConf.ProviderType())), + zap.String("field_name", subConf.RootFieldName()), + ) + } + } + hookCtx := &pubSubSubscriptionOnStartHookContext{ request: requestContext.Request(), - logger: requestContext.Logger(), + logger: logger, operation: requestContext.Operation(), authentication: requestContext.Authentication(), subscriptionEventConfiguration: subConf, @@ -213,9 +226,15 @@ func NewEngineSubscriptionOnStartHook(fn func(ctx SubscriptionOnStartHandlerCont return func(resolveCtx resolve.StartupHookContext, input []byte) error { requestContext := getRequestContext(resolveCtx.Context) + + logger := requestContext.Logger() + if logger != nil { + logger = logger.With(zap.String("component", "engine_subscription_on_start_hook")) + } + hookCtx := &engineSubscriptionOnStartHookContext{ request: requestContext.Request(), - logger: requestContext.Logger(), + logger: logger, operation: requestContext.Operation(), authentication: requestContext.Authentication(), writeEventHook: resolveCtx.Updater, @@ -226,6 +245,9 @@ func NewEngineSubscriptionOnStartHook(fn func(ctx SubscriptionOnStartHandlerCont } type StreamReceiveEventHandlerContext interface { + // Context is a context for handlers. + // If it is cancelled, the handler should stop processing. + Context() context.Context // Request is the initial client request that started the subscription Request() *http.Request // Logger is the logger for the request @@ -283,9 +305,22 @@ func NewPubSubOnPublishEventsHook(fn func(ctx StreamPublishEventHandlerContext, return func(ctx context.Context, pubConf datasource.PublishEventConfiguration, evts []datasource.StreamEvent, eventBuilder datasource.EventBuilderFn) ([]datasource.StreamEvent, error) { requestContext := getRequestContext(ctx) + + logger := requestContext.Logger() + if logger != nil { + logger = logger.With(zap.String("component", "on_publish_events_hook")) + if pubConf != nil { + logger = logger.With( + zap.String("provider_id", pubConf.ProviderID()), + zap.String("provider_type", string(pubConf.ProviderType())), + zap.String("field_name", pubConf.RootFieldName()), + ) + } + } + hookCtx := &pubSubPublishEventHookContext{ request: requestContext.Request(), - logger: requestContext.Logger(), + logger: logger, operation: requestContext.Operation(), authentication: requestContext.Authentication(), publishEventConfiguration: pubConf, @@ -305,6 +340,11 @@ type pubSubStreamReceiveEventHookContext struct { authentication authentication.Authentication subscriptionEventConfiguration datasource.SubscriptionEventConfiguration eventBuilder datasource.EventBuilderFn + context context.Context +} + +func (c *pubSubStreamReceiveEventHookContext) Context() context.Context { + return c.context } func (c *pubSubStreamReceiveEventHookContext) Request() *http.Request { @@ -336,15 +376,29 @@ func NewPubSubOnReceiveEventsHook(fn func(ctx StreamReceiveEventHandlerContext, return nil } - return func(ctx context.Context, subConf datasource.SubscriptionEventConfiguration, eventBuilder datasource.EventBuilderFn, evts []datasource.StreamEvent) ([]datasource.StreamEvent, error) { - requestContext := getRequestContext(ctx) + return func(subscriptionCtx context.Context, updaterCtx context.Context, subConf datasource.SubscriptionEventConfiguration, eventBuilder datasource.EventBuilderFn, evts []datasource.StreamEvent) ([]datasource.StreamEvent, error) { + requestContext := getRequestContext(subscriptionCtx) + + logger := requestContext.Logger() + if logger != nil { + logger = logger.With(zap.String("component", "on_receive_events_hook")) + if subConf != nil { + logger = logger.With( + zap.String("provider_id", subConf.ProviderID()), + zap.String("provider_type", string(subConf.ProviderType())), + zap.String("field_name", subConf.RootFieldName()), + ) + } + } + hookCtx := &pubSubStreamReceiveEventHookContext{ request: requestContext.Request(), - logger: requestContext.Logger(), + logger: logger, operation: requestContext.Operation(), authentication: requestContext.Authentication(), subscriptionEventConfiguration: subConf, eventBuilder: eventBuilder, + context: updaterCtx, } newEvts, err := fn(hookCtx, datasource.NewStreamEvents(evts)) return newEvts.Unsafe(), err diff --git a/router/demo.config.yaml b/router/demo.config.yaml index 2a081e74be..9a72e31de2 100644 --- a/router/demo.config.yaml +++ b/router/demo.config.yaml @@ -19,4 +19,4 @@ events: redis: - id: my-redis urls: - - "redis://localhost:6379/2" + - "redis://localhost:6379/2" \ No newline at end of file diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index f386b27156..bb8c910982 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -646,7 +646,12 @@ type EventsConfiguration struct { } type SubscriptionHooksConfiguration struct { - MaxConcurrentEventReceiveHandlers int `yaml:"max_concurrent_event_receive_handlers" envDefault:"100"` + OnReceiveEvents OnReceiveEventsConfiguration `yaml:"on_receive_events"` +} + +type OnReceiveEventsConfiguration struct { + MaxConcurrentHandlers int `yaml:"max_concurrent_handlers" envDefault:"100"` + HandlerTimeout time.Duration `yaml:"handler_timeout" envDefault:"5s"` } type Cluster struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index f800e07245..a2326c86ec 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2310,11 +2310,23 @@ "description": "Configuration for subscription custom modules that are executed when events are received from a broker.", "additionalProperties": false, "properties": { - "max_concurrent_event_receive_handlers": { - "type": "integer", - "description": "The maximum number of concurrent event receive handlers. This controls the concurrency of the OnReceiveEvents custom modules.", - "minimum": 1, - "default": 100 + "on_receive_events": { + "type": "object", + "description": "Configuration for the OnReceiveEvents hook that is called when events are received from a broker.", + "additionalProperties": false, + "properties": { + "max_concurrent_handlers": { + "type": "integer", + "description": "The maximum number of concurrent event receive handlers. This controls the concurrency of the OnReceiveEvents custom modules.", + "minimum": 1, + "default": 100 + }, + "handler_timeout": { + "type": "string", + "description": "The amount of time that OnReceiveEvents handlers can run in total for a single batch of events. Specify as a duration string (e.g., '5s', '1m', '500ms').", + "default": "5s" + } + } } } } diff --git a/router/pkg/config/fixtures/full.yaml b/router/pkg/config/fixtures/full.yaml index ee5b7d8ef8..3f79ed83df 100644 --- a/router/pkg/config/fixtures/full.yaml +++ b/router/pkg/config/fixtures/full.yaml @@ -331,7 +331,9 @@ events: - 'redis://localhost:6379/11' cluster_enabled: true subscription_hooks: - max_concurrent_event_receive_handlers: 100 + on_receive_events: + max_concurrent_handlers: 100 + handler_timeout: 5s engine: enable_single_flight: true diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 40ce94c15a..7655cc7007 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -297,7 +297,10 @@ "Redis": null }, "SubscriptionHooks": { - "MaxConcurrentEventReceiveHandlers": 100 + "OnReceiveEvents": { + "MaxConcurrentHandlers": 100, + "HandlerTimeout": 5000000000 + } } }, "CacheWarmup": { diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 5a09b35205..2731dee760 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -643,7 +643,10 @@ ] }, "SubscriptionHooks": { - "MaxConcurrentEventReceiveHandlers": 100 + "OnReceiveEvents": { + "MaxConcurrentHandlers": 100, + "HandlerTimeout": 5000000000 + } } }, "CacheWarmup": { diff --git a/router/pkg/pubsub/datasource/hooks.go b/router/pkg/pubsub/datasource/hooks.go index a2e53e7183..a262058463 100644 --- a/router/pkg/pubsub/datasource/hooks.go +++ b/router/pkg/pubsub/datasource/hooks.go @@ -2,6 +2,7 @@ package datasource import ( "context" + "time" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) @@ -10,12 +11,28 @@ type SubscriptionOnStartFn func(ctx resolve.StartupHookContext, subConf Subscrip type OnPublishEventsFn func(ctx context.Context, pubConf PublishEventConfiguration, evts []StreamEvent, eventBuilder EventBuilderFn) ([]StreamEvent, error) -type OnReceiveEventsFn func(ctx context.Context, subConf SubscriptionEventConfiguration, eventBuilder EventBuilderFn, evts []StreamEvent) ([]StreamEvent, error) +type OnReceiveEventsFn func(subscriptionCtx context.Context, updaterCtx context.Context, subConf SubscriptionEventConfiguration, eventBuilder EventBuilderFn, evts []StreamEvent) ([]StreamEvent, error) // Hooks contains hooks for the pubsub providers and data sources type Hooks struct { - SubscriptionOnStart []SubscriptionOnStartFn - OnReceiveEvents []OnReceiveEventsFn - OnPublishEvents []OnPublishEventsFn - MaxConcurrentOnReceiveHandlers int + SubscriptionOnStart SubscriptionOnStartHooks + OnPublishEvents OnPublishEventsHooks + OnReceiveEvents OnReceiveEventsHooks +} + +// SubscriptionOnStartHooks contains hooks with settings for subscription starts +type SubscriptionOnStartHooks struct { + Handlers []SubscriptionOnStartFn +} + +// OnPublishEventsHooks contains hooks with settings for event publishing +type OnPublishEventsHooks struct { + Handlers []OnPublishEventsFn +} + +// OnReceiveEventsHooks contains hooks with settings for event receiving +type OnReceiveEventsHooks struct { + Handlers []OnReceiveEventsFn + MaxConcurrentHandlers int + Timeout time.Duration } diff --git a/router/pkg/pubsub/datasource/pubsubprovider.go b/router/pkg/pubsub/datasource/pubsubprovider.go index e20f1ace2b..1920bc2b46 100644 --- a/router/pkg/pubsub/datasource/pubsubprovider.go +++ b/router/pkg/pubsub/datasource/pubsubprovider.go @@ -41,7 +41,7 @@ func (p *PubSubProvider) applyPublishEventHooks(ctx context.Context, cfg Publish }() currentEvents = events - for _, hook := range p.hooks.OnPublishEvents { + for _, hook := range p.hooks.OnPublishEvents.Handlers { var err error currentEvents, err = hook(ctx, cfg, currentEvents, p.eventBuilder) currentEvents = slices.DeleteFunc(currentEvents, func(event StreamEvent) bool { @@ -90,7 +90,7 @@ func (p *PubSubProvider) Subscribe(ctx context.Context, cfg SubscriptionEventCon } func (p *PubSubProvider) Publish(ctx context.Context, cfg PublishEventConfiguration, events []StreamEvent) error { - if len(p.hooks.OnPublishEvents) == 0 { + if len(p.hooks.OnPublishEvents.Handlers) == 0 { return p.Adapter.Publish(ctx, cfg, events) } diff --git a/router/pkg/pubsub/datasource/pubsubprovider_test.go b/router/pkg/pubsub/datasource/pubsubprovider_test.go index 0bf12e7f60..b956ab38f0 100644 --- a/router/pkg/pubsub/datasource/pubsubprovider_test.go +++ b/router/pkg/pubsub/datasource/pubsubprovider_test.go @@ -253,7 +253,9 @@ func TestProvider_Publish_WithHooks_Success(t *testing.T) { provider := PubSubProvider{ Adapter: mockAdapter, hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{testHook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{testHook}, + }, }, eventBuilder: testPubSubEventBuilder, } @@ -286,7 +288,9 @@ func TestProvider_Publish_WithHooks_HookError(t *testing.T) { provider := PubSubProvider{ Adapter: mockAdapter, hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{testHook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{testHook}, + }, }, Logger: zap.NewNop(), eventBuilder: testPubSubEventBuilder, @@ -322,7 +326,9 @@ func TestProvider_Publish_WithHooks_AdapterError(t *testing.T) { provider := PubSubProvider{ Adapter: mockAdapter, hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{testHook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{testHook}, + }, }, eventBuilder: testPubSubEventBuilder, } @@ -358,7 +364,9 @@ func TestProvider_Publish_WithMultipleHooks_Success(t *testing.T) { provider := PubSubProvider{ Adapter: mockAdapter, hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook1, hook2}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook1, hook2}, + }, }, eventBuilder: testPubSubEventBuilder, } @@ -375,7 +383,9 @@ func TestProvider_SetHooks(t *testing.T) { } hooks := Hooks{ - OnPublishEvents: []OnPublishEventsFn{testHook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{testHook}, + }, } provider.SetHooks(hooks) @@ -396,7 +406,7 @@ func TestNewPubSubProvider(t *testing.T) { assert.Equal(t, typeID, provider.TypeID()) assert.Equal(t, mockAdapter, provider.Adapter) assert.Equal(t, logger, provider.Logger) - assert.Empty(t, provider.hooks.OnPublishEvents) + assert.Empty(t, provider.hooks.OnPublishEvents.Handlers) } func TestApplyPublishEventHooks_NoHooks(t *testing.T) { @@ -412,7 +422,9 @@ func TestApplyPublishEventHooks_NoHooks(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{}, + }, }, } @@ -443,7 +455,9 @@ func TestApplyPublishEventHooks_SingleHook_Success(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook}, + }, }, } @@ -472,7 +486,9 @@ func TestApplyPublishEventHooks_SingleHook_Error(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook}, + }, }, } @@ -507,7 +523,9 @@ func TestApplyPublishEventHooks_MultipleHooks_Success(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook1, hook2, hook3}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook1, hook2, hook3}, + }, }, } @@ -543,7 +561,9 @@ func TestApplyPublishEventHooks_MultipleHooks_MiddleHookError(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook1, hook2, hook3}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook1, hook2, hook3}, + }, }, } @@ -599,7 +619,9 @@ func TestApplyPublishEventHooks_PanicRecovery(t *testing.T) { provider := &PubSubProvider{ Logger: zap.NewNop(), hooks: Hooks{ - OnPublishEvents: []OnPublishEventsFn{hook}, + OnPublishEvents: OnPublishEventsHooks{ + Handlers: []OnPublishEventsFn{hook}, + }, }, } diff --git a/router/pkg/pubsub/datasource/subscription_datasource.go b/router/pkg/pubsub/datasource/subscription_datasource.go index c625af9c33..9285d6cfb7 100644 --- a/router/pkg/pubsub/datasource/subscription_datasource.go +++ b/router/pkg/pubsub/datasource/subscription_datasource.go @@ -46,7 +46,14 @@ func (s *PubSubSubscriptionDataSource[C]) Start(ctx *resolve.Context, input []by return errors.New("invalid subscription configuration") } - return s.pubSub.Subscribe(ctx.Context(), conf, NewSubscriptionEventUpdater(conf, s.hooks, updater, s.logger, s.eventBuilder)) + logger := s.logger.With( + zap.String("component", "subscription_event_updater"), + zap.String("provider_id", conf.ProviderID()), + zap.String("provider_type", string(conf.ProviderType())), + zap.String("field_name", conf.RootFieldName()), + ) + + return s.pubSub.Subscribe(ctx.Context(), conf, NewSubscriptionEventUpdater(conf, s.hooks, updater, logger, s.eventBuilder)) } func (s *PubSubSubscriptionDataSource[C]) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) (err error) { @@ -66,7 +73,7 @@ func (s *PubSubSubscriptionDataSource[C]) SubscriptionOnStart(ctx resolve.Startu } }() - for _, fn := range s.hooks.SubscriptionOnStart { + for _, fn := range s.hooks.SubscriptionOnStart.Handlers { conf, errConf := s.SubscriptionEventConfiguration(input) if errConf != nil { return err diff --git a/router/pkg/pubsub/datasource/subscription_datasource_test.go b/router/pkg/pubsub/datasource/subscription_datasource_test.go index a292f4b0f4..6e2d957a07 100644 --- a/router/pkg/pubsub/datasource/subscription_datasource_test.go +++ b/router/pkg/pubsub/datasource/subscription_datasource_test.go @@ -233,7 +233,9 @@ func TestPubSubSubscriptionDataSource_SubscriptionOnStart_WithHooks(t *testing.T } dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook1, hook2}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook1, hook2}, + }, }) testConfig := testSubscriptionEventConfiguration{ @@ -270,7 +272,9 @@ func TestPubSubSubscriptionDataSource_SubscriptionOnStart_HookReturnsClose(t *te } dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook}, + }, }) testConfig := testSubscriptionEventConfiguration{ @@ -304,7 +308,9 @@ func TestPubSubSubscriptionDataSource_SubscriptionOnStart_HookReturnsError(t *te } dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook}, + }, }) testConfig := testSubscriptionEventConfiguration{ @@ -333,7 +339,7 @@ func TestPubSubSubscriptionDataSource_SetSubscriptionOnStartFns(t *testing.T) { dataSource := NewPubSubSubscriptionDataSource[testSubscriptionEventConfiguration](mockAdapter, uniqueRequestIDFn, zap.NewNop(), testSubscriptionDataSourceEventBuilder) // Initially should have no hooks - assert.Len(t, dataSource.hooks.SubscriptionOnStart, 0) + assert.Len(t, dataSource.hooks.SubscriptionOnStart.Handlers, 0) // Add hooks hook1 := func(ctx resolve.StartupHookContext, config SubscriptionEventConfiguration, eventBuilder EventBuilderFn) error { @@ -344,14 +350,18 @@ func TestPubSubSubscriptionDataSource_SetSubscriptionOnStartFns(t *testing.T) { } dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook1}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook1}, + }, }) - assert.Len(t, dataSource.hooks.SubscriptionOnStart, 1) + assert.Len(t, dataSource.hooks.SubscriptionOnStart.Handlers, 1) dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook2}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook2}, + }, }) - assert.Len(t, dataSource.hooks.SubscriptionOnStart, 1) + assert.Len(t, dataSource.hooks.SubscriptionOnStart.Handlers, 1) } func TestNewPubSubSubscriptionDataSource(t *testing.T) { @@ -365,7 +375,7 @@ func TestNewPubSubSubscriptionDataSource(t *testing.T) { assert.NotNil(t, dataSource) assert.Equal(t, mockAdapter, dataSource.pubSub) assert.NotNil(t, dataSource.uniqueRequestID) - assert.Empty(t, dataSource.hooks.SubscriptionOnStart) + assert.Empty(t, dataSource.hooks.SubscriptionOnStart.Handlers) } func TestPubSubSubscriptionDataSource_InterfaceCompliance(t *testing.T) { @@ -424,7 +434,9 @@ func TestPubSubSubscriptionDataSource_SubscriptionOnStart_PanicRecovery(t *testi } dataSource.SetHooks(Hooks{ - SubscriptionOnStart: []SubscriptionOnStartFn{hook}, + SubscriptionOnStart: SubscriptionOnStartHooks{ + Handlers: []SubscriptionOnStartFn{hook}, + }, }) testConfig := testSubscriptionEventConfiguration{ diff --git a/router/pkg/pubsub/datasource/subscription_event_updater.go b/router/pkg/pubsub/datasource/subscription_event_updater.go index 5ed4a6c837..0dd1060faf 100644 --- a/router/pkg/pubsub/datasource/subscription_event_updater.go +++ b/router/pkg/pubsub/datasource/subscription_event_updater.go @@ -4,12 +4,18 @@ import ( "context" "slices" "sync" + "time" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +const ( + timeoutGracePeriod = 50 * time.Millisecond + defaultTimeout = 5 * time.Second +) + // SubscriptionEventUpdater is a wrapper around the SubscriptionUpdater interface // that provides a way to send the event struct instead of the raw data // It is used to give access to the event additional fields to the hooks. @@ -26,10 +32,12 @@ type subscriptionEventUpdater struct { hooks Hooks logger *zap.Logger eventBuilder EventBuilderFn + semaphore chan struct{} + timeout time.Duration } func (s *subscriptionEventUpdater) Update(events []StreamEvent) { - if len(s.hooks.OnReceiveEvents) == 0 { + if len(s.hooks.OnReceiveEvents.Handlers) == 0 { for _, event := range events { s.eventUpdater.Update(event.GetData()) } @@ -37,27 +45,37 @@ func (s *subscriptionEventUpdater) Update(events []StreamEvent) { } subscriptions := s.eventUpdater.Subscriptions() - limit := max(s.hooks.MaxConcurrentOnReceiveHandlers, 1) - semaphore := make(chan struct{}, limit) wg := sync.WaitGroup{} - errCh := make(chan error, len(subscriptions)) + updaterCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(s.timeout)) + defer cancel() - for ctx, subId := range subscriptions { - semaphore <- struct{}{} // Acquire a slot - wg.Add(1) - go s.updateSubscription(ctx, &wg, errCh, semaphore, subId, events) - } + done := make(chan struct{}) - doneLogging := make(chan struct{}) go func() { - s.deduplicateAndLogErrors(errCh, len(subscriptions)) - doneLogging <- struct{}{} + for subCtx, subId := range subscriptions { + s.semaphore <- struct{}{} // Acquire slot, blocks if all slots are taken + wg.Add(1) + go s.updateSubscription(subCtx, updaterCtx, &wg, subId, events) + } + + wg.Wait() + close(done) }() - wg.Wait() - close(semaphore) - close(errCh) - <-doneLogging + select { + case <-done: + s.logger.Debug("All subscription updates completed") + // All subscriptions completed successfully + case <-time.After(s.timeout + timeoutGracePeriod): + // Timeout exceeded, some subscription updates may still be running. + // We can't stop them but we will also not wait for them, basically abandoning them. + // They will continue to hold their semaphore slots until they complete, + // which means the next Update() call will have fewer available slots. + // Also since we will process the next batch of events while having abandoned updaters, + // those updaters might eventually push their events to the subscription late, + // which means events might arrive out of order. + s.logger.Warn("Timeout exceeded during subscription updates, events may arrive out of order") + } } func (s *subscriptionEventUpdater) Complete() { @@ -66,34 +84,31 @@ func (s *subscriptionEventUpdater) Complete() { func (s *subscriptionEventUpdater) Close(kind resolve.SubscriptionCloseKind) { s.eventUpdater.Close(kind) + close(s.semaphore) } func (s *subscriptionEventUpdater) SetHooks(hooks Hooks) { s.hooks = hooks } -func (s *subscriptionEventUpdater) updateSubscription(ctx context.Context, wg *sync.WaitGroup, errCh chan error, semaphore chan struct{}, subID resolve.SubscriptionIdentifier, events []StreamEvent) { +func (s *subscriptionEventUpdater) updateSubscription(subscriptionCtx context.Context, updaterCtx context.Context, wg *sync.WaitGroup, subID resolve.SubscriptionIdentifier, events []StreamEvent) { defer wg.Done() defer func() { if r := recover(); r != nil { s.recoverPanic(subID, r) } - <-semaphore // release the slot when done + <-s.semaphore // release the slot when done }() - hooks := s.hooks.OnReceiveEvents + hooks := s.hooks.OnReceiveEvents.Handlers // modify events with hooks var err error for i := range hooks { - events, err = hooks[i](ctx, s.subscriptionEventConfiguration, s.eventBuilder, events) + events, err = hooks[i](subscriptionCtx, updaterCtx, s.subscriptionEventConfiguration, s.eventBuilder, events) events = slices.DeleteFunc(events, func(event StreamEvent) bool { return event == nil }) - - if err != nil { - errCh <- err - } } // send events to the subscription, @@ -120,36 +135,6 @@ func (s *subscriptionEventUpdater) recoverPanic(subID resolve.SubscriptionIdenti s.eventUpdater.CloseSubscription(resolve.SubscriptionCloseKindDownstreamServiceError, subID) } -// deduplicateAndLogErrors collects errors from errCh -// and deduplicates them based on their err.Error() value. -// Afterwards it uses s.logger to log the message. -func (s *subscriptionEventUpdater) deduplicateAndLogErrors(errCh chan error, size int) { - if s.logger == nil { - return - } - - errs := make(map[string]int, size) - for err := range errCh { - amount, found := errs[err.Error()] - if found { - errs[err.Error()] = amount + 1 - continue - } - errs[err.Error()] = 1 - } - - for err, amount := range errs { - s.logger.Error( - "some handlers have thrown an error", - zap.String("error", err), - zap.Int("amount_handlers", amount), - zap.String("provider_type", string(s.subscriptionEventConfiguration.ProviderType())), - zap.String("provider_id", s.subscriptionEventConfiguration.ProviderID()), - zap.String("field_name", s.subscriptionEventConfiguration.RootFieldName()), - ) - } -} - func NewSubscriptionEventUpdater( cfg SubscriptionEventConfiguration, hooks Hooks, @@ -157,11 +142,19 @@ func NewSubscriptionEventUpdater( logger *zap.Logger, eventBuilder EventBuilderFn, ) SubscriptionEventUpdater { + limit := max(hooks.OnReceiveEvents.MaxConcurrentHandlers, 1) + timeout := hooks.OnReceiveEvents.Timeout + if timeout == 0 { + timeout = defaultTimeout + } + return &subscriptionEventUpdater{ subscriptionEventConfiguration: cfg, hooks: hooks, eventUpdater: eventUpdater, logger: logger, eventBuilder: eventBuilder, + semaphore: make(chan struct{}, limit), + timeout: timeout, } } diff --git a/router/pkg/pubsub/datasource/subscription_event_updater_test.go b/router/pkg/pubsub/datasource/subscription_event_updater_test.go index 1b6c1bd3a7..e8fceabf2e 100644 --- a/router/pkg/pubsub/datasource/subscription_event_updater_test.go +++ b/router/pkg/pubsub/datasource/subscription_event_updater_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -58,11 +59,13 @@ func TestSubscriptionEventUpdater_Update_NoHooks(t *testing.T) { mockUpdater.On("Update", []byte("test data 1")).Return() mockUpdater.On("Update", []byte("test data 2")).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, // No hooks - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, // No hooks + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(events) } @@ -84,7 +87,7 @@ func TestSubscriptionEventUpdater_UpdateSubscription_WithHooks_Success(t *testin // Create wrapper function for the mock receivedArgs := make(chan receivedHooksArgs, 1) - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { receivedArgs <- receivedHooksArgs{events: events, cfg: cfg} return modifiedEvents, nil } @@ -96,14 +99,17 @@ func TestSubscriptionEventUpdater_UpdateSubscription_WithHooks_Success(t *testin context.Background(): subId, }) - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, }, - eventBuilder: testEventBuilder, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(originalEvents) @@ -129,7 +135,7 @@ func TestSubscriptionEventUpdater_UpdateSubscriptions_WithHooks_Error(t *testing hookError := errors.New("hook processing error") // Define hook that returns an error - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { return nil, hookError } @@ -141,14 +147,17 @@ func TestSubscriptionEventUpdater_UpdateSubscriptions_WithHooks_Error(t *testing mockUpdater.On("CloseSubscription", resolve.SubscriptionCloseKindNormal, subId).Return() // Should not call Update or UpdateSubscription on eventUpdater since hook fails - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, }, - eventBuilder: testEventBuilder, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(events) @@ -171,13 +180,13 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooks_Success(t *testing.T) // Chain of hooks that modify the data receivedArgs1 := make(chan receivedHooksArgs, 1) - hook1 := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook1 := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { receivedArgs1 <- receivedHooksArgs{events: events, cfg: cfg} return []StreamEvent{&testEvent{mutableTestEvent("modified by hook1")}}, nil } receivedArgs2 := make(chan receivedHooksArgs, 1) - hook2 := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook2 := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { receivedArgs2 <- receivedHooksArgs{events: events, cfg: cfg} return []StreamEvent{&testEvent{mutableTestEvent("modified by hook2")}}, nil } @@ -189,14 +198,17 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooks_Success(t *testing.T) context.Background(): subId, }) - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{hook1, hook2}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{hook1, hook2}, + }, }, - eventBuilder: testEventBuilder, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(originalEvents) @@ -227,11 +239,13 @@ func TestSubscriptionEventUpdater_Complete(t *testing.T) { mockUpdater.On("Complete").Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Complete() } @@ -247,11 +261,13 @@ func TestSubscriptionEventUpdater_Close(t *testing.T) { mockUpdater.On("Close", closeKind).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Close(closeKind) } @@ -264,24 +280,30 @@ func TestSubscriptionEventUpdater_SetHooks(t *testing.T) { fieldName: "testField", } - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { return events, nil } hooks := Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, } - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, - eventBuilder: testEventBuilder, - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.SetHooks(hooks) - assert.Equal(t, hooks, updater.hooks) + // Type assert to access internal fields for testing + concreteUpdater, ok := updater.(*subscriptionEventUpdater) + require.True(t, ok) + assert.Equal(t, hooks, concreteUpdater.hooks) } func TestNewSubscriptionEventUpdater(t *testing.T) { @@ -292,12 +314,14 @@ func TestNewSubscriptionEventUpdater(t *testing.T) { fieldName: "testField", } - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { return events, nil } hooks := Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, } updater := NewSubscriptionEventUpdater(config, hooks, mockUpdater, zap.NewNop(), testEventBuilder) @@ -331,11 +355,13 @@ func TestSubscriptionEventUpdater_Update_PassthroughWithNoHooks(t *testing.T) { mockUpdater.On("Update", []byte("event data 2")).Return() mockUpdater.On("Update", []byte("event data 3")).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, // No hooks - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, // No hooks + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(events) @@ -359,7 +385,7 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookModification(t *testing.T } // Hook that modifies events by adding a prefix - hook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { modifiedEvents := make([]StreamEvent, len(events)) for i, event := range events { modifiedData := "modified: " + string(event.GetData()) @@ -377,13 +403,17 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookModification(t *testing.T mockUpdater.On("UpdateSubscription", subId, []byte("modified: original data 1")).Return() mockUpdater.On("UpdateSubscription", subId, []byte("modified: original data 2")).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{hook}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{hook}, + }, }, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(originalEvents) @@ -395,7 +425,7 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookModification(t *testing.T mockUpdater.AssertNotCalled(t, "Update") } -func TestSubscriptionEventUpdater_Update_WithSingleHookError_ClosesSubscriptionAndLogsError(t *testing.T) { +func TestSubscriptionEventUpdater_Update_WithSingleHookError_ClosesSubscription(t *testing.T) { mockUpdater := NewMockSubscriptionUpdater(t) config := &testSubscriptionEventConfig{ providerID: "test-provider", @@ -408,15 +438,11 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookError_ClosesSubscriptionA hookError := errors.New("hook processing failed") // Hook that returns an error - hook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { // Return the events but also return an error return events, hookError } - // Set up logger with observer to verify error logging - zCore, logObserver := observer.New(zap.InfoLevel) - logger := zap.New(zCore) - subId := resolve.SubscriptionIdentifier{ConnectionID: 1, SubscriptionID: 1} mockUpdater.On("Subscriptions").Return(map[context.Context]resolve.SubscriptionIdentifier{ context.Background(): subId, @@ -427,8 +453,10 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookError_ClosesSubscriptionA mockUpdater.On("CloseSubscription", resolve.SubscriptionCloseKindNormal, subId).Return() updater := NewSubscriptionEventUpdater(config, Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{hook}, - }, mockUpdater, logger, testEventBuilder) + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{hook}, + }, + }, mockUpdater, zap.NewNop(), testEventBuilder) updater.Update(events) @@ -438,16 +466,6 @@ func TestSubscriptionEventUpdater_Update_WithSingleHookError_ClosesSubscriptionA mockUpdater.AssertCalled(t, "CloseSubscription", resolve.SubscriptionCloseKindNormal, subId) // Update should NOT be called when hooks are present mockUpdater.AssertNotCalled(t, "Update") - - // Verify error was logged (logging happens asynchronously) - assert.Eventually(t, func() bool { - logs := logObserver.FilterMessageSnippet("some handlers have thrown an error").TakeAll() - if len(logs) != 1 { - return false - } - // Verify the logged error message contains our error - return logs[0].ContextMap()["error"] == hookError.Error() - }, time.Second, 10*time.Millisecond, "expected error to be logged") } func TestSubscriptionEventUpdater_Update_WithMultipleHooksChaining(t *testing.T) { @@ -467,7 +485,7 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooksChaining(t *testing.T) // Hook 1: Adds "step1: " prefix receivedArgs1 := make(chan receivedHooksArgs, 1) - hook1 := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook1 := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { mu.Lock() hookCallOrder = append(hookCallOrder, 1) mu.Unlock() @@ -482,7 +500,7 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooksChaining(t *testing.T) // Hook 2: Adds "step2: " prefix receivedArgs2 := make(chan receivedHooksArgs, 1) - hook2 := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook2 := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { mu.Lock() hookCallOrder = append(hookCallOrder, 2) mu.Unlock() @@ -497,7 +515,7 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooksChaining(t *testing.T) // Hook 3: Adds "step3: " prefix receivedArgs3 := make(chan receivedHooksArgs, 1) - hook3 := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + hook3 := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { mu.Lock() hookCallOrder = append(hookCallOrder, 3) mu.Unlock() @@ -517,13 +535,17 @@ func TestSubscriptionEventUpdater_Update_WithMultipleHooksChaining(t *testing.T) // Final modified data should have all three transformations applied mockUpdater.On("UpdateSubscription", subId, []byte("step3: step2: step1: original")).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{hook1, hook2, hook3}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{hook1, hook2, hook3}, + }, }, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(originalEvents) @@ -579,11 +601,13 @@ func TestSubscriptionEventUpdater_UpdateEvents_EmptyEvents(t *testing.T) { } events := []StreamEvent{} // Empty events - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, // No hooks - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, // No hooks + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Update(events) @@ -612,11 +636,13 @@ func TestSubscriptionEventUpdater_Close_WithDifferentCloseKinds(t *testing.T) { mockUpdater.On("Close", tc.closeKind).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{}, - } + updater := NewSubscriptionEventUpdater( + config, + Hooks{}, + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) updater.Close(tc.closeKind) }) @@ -650,18 +676,21 @@ func TestSubscriptionEventUpdater_UpdateSubscription_WithHookError_ClosesSubscri &testEvent{mutableTestEvent("test data")}, } - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { return events, tc.hookError } - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, }, - eventBuilder: testEventBuilder, - } + mockUpdater, + zap.NewNop(), + testEventBuilder, + ) subId := resolve.SubscriptionIdentifier{ConnectionID: 1, SubscriptionID: 1} mockUpdater.On("UpdateSubscription", subId, []byte("test data")).Return() @@ -677,50 +706,6 @@ func TestSubscriptionEventUpdater_UpdateSubscription_WithHookError_ClosesSubscri } } -func TestSubscriptionEventUpdater_UpdateSubscription_WithHooks_Error_LoggerWritesError(t *testing.T) { - mockUpdater := NewMockSubscriptionUpdater(t) - config := &testSubscriptionEventConfig{ - providerID: "test-provider", - providerType: ProviderTypeNats, - fieldName: "testField", - } - events := []StreamEvent{ - &testEvent{mutableTestEvent("test data")}, - } - hookError := errors.New("hook processing error") - - // Define hook that returns an error - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { - return nil, hookError - } - - zCore, logObserver := observer.New(zap.InfoLevel) - logger := zap.New(zCore) - - // Test with a real zap logger to verify error logging behavior - // The logger.Error() call should be executed when an error occurs - updater := NewSubscriptionEventUpdater(config, Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, - }, mockUpdater, logger, testEventBuilder) - - subId := resolve.SubscriptionIdentifier{ConnectionID: 1, SubscriptionID: 1} - mockUpdater.On("Subscriptions").Return(map[context.Context]resolve.SubscriptionIdentifier{ - context.Background(): subId, - }) - mockUpdater.On("CloseSubscription", resolve.SubscriptionCloseKindNormal, subId).Return() - - updater.Update(events) - - // Assert that Update was not called on the eventUpdater - mockUpdater.AssertNotCalled(t, "UpdateSubscription") - mockUpdater.AssertCalled(t, "CloseSubscription", resolve.SubscriptionCloseKindNormal, subId) - - // log error messages for hooks are written async, we need to wait for them to be written - assert.Eventually(t, func() bool { - return len(logObserver.FilterMessageSnippet("some handlers have thrown an error").TakeAll()) == 1 - }, time.Second, 10*time.Millisecond, "expected one deduplicated error log") -} - func TestSubscriptionEventUpdater_OnReceiveEvents_PanicRecovery(t *testing.T) { panicErr := errors.New("panic error") @@ -758,7 +743,7 @@ func TestSubscriptionEventUpdater_OnReceiveEvents_PanicRecovery(t *testing.T) { } // Create hook that panics - testHook := func(ctx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { + testHook := func(subCtx context.Context, updaterCtx context.Context, cfg SubscriptionEventConfiguration, eventBuilder EventBuilderFn, events []StreamEvent) ([]StreamEvent, error) { panic(tt.panicValue) } @@ -768,14 +753,17 @@ func TestSubscriptionEventUpdater_OnReceiveEvents_PanicRecovery(t *testing.T) { }) mockUpdater.On("CloseSubscription", resolve.SubscriptionCloseKindDownstreamServiceError, subId).Return() - updater := &subscriptionEventUpdater{ - eventUpdater: mockUpdater, - subscriptionEventConfiguration: config, - hooks: Hooks{ - OnReceiveEvents: []OnReceiveEventsFn{testHook}, + updater := NewSubscriptionEventUpdater( + config, + Hooks{ + OnReceiveEvents: OnReceiveEventsHooks{ + Handlers: []OnReceiveEventsFn{testHook}, + }, }, - logger: logger, - } + mockUpdater, + logger, + testEventBuilder, + ) updater.Update(events) diff --git a/router/pkg/pubsub/pubsub_test.go b/router/pkg/pubsub/pubsub_test.go index 39444689ac..f0b99ec303 100644 --- a/router/pkg/pubsub/pubsub_test.go +++ b/router/pkg/pubsub/pubsub_test.go @@ -63,8 +63,8 @@ func TestBuild_OK(t *testing.T) { mockPubSubProvider.On("ID").Return("provider-1") mockPubSubProvider.On("SetHooks", datasource.Hooks{ - OnReceiveEvents: []datasource.OnReceiveEventsFn(nil), - OnPublishEvents: []datasource.OnPublishEventsFn(nil), + OnReceiveEvents: datasource.OnReceiveEventsHooks{Handlers: []datasource.OnReceiveEventsFn(nil)}, + OnPublishEvents: datasource.OnPublishEventsHooks{Handlers: []datasource.OnPublishEventsFn(nil)}, }).Return(nil) mockBuilder.On("TypeID").Return("nats") @@ -242,8 +242,8 @@ func TestBuild_ShouldNotInitializeProviderIfNotUsed(t *testing.T) { mockPubSubUsedProvider.On("ID").Return("provider-2") mockPubSubUsedProvider.On("SetHooks", datasource.Hooks{ - OnReceiveEvents: []datasource.OnReceiveEventsFn(nil), - OnPublishEvents: []datasource.OnPublishEventsFn(nil), + OnReceiveEvents: datasource.OnReceiveEventsHooks{Handlers: []datasource.OnReceiveEventsFn(nil)}, + OnPublishEvents: datasource.OnPublishEventsHooks{Handlers: []datasource.OnPublishEventsFn(nil)}, }).Return(nil) mockBuilder.On("TypeID").Return("nats")