feat(router): add a timeout for on_receive_event hooks#2329
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR restructures the subscription hooks system by replacing flat hook slices with nested hook structs, introduces a new timeout configuration for event receive handlers alongside concurrency controls, and implements semaphore-based concurrency limiting with deadline-based context in the subscription event updater to manage handler execution timing and ordering. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas requiring extra attention:
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
router/pkg/pubsub/datasource/subscription_datasource.go (1)
76-85: Bug: returning the wrong error variable; unmarshal failures get swallowed.If SubscriptionEventConfiguration(input) fails, the code returns err (likely nil) instead of errConf, masking the error and continuing.
Apply this minimal fix and unmarshal once:
- for _, fn := range s.hooks.SubscriptionOnStart.Handlers { - conf, errConf := s.SubscriptionEventConfiguration(input) - if errConf != nil { - return err - } - err = fn(ctx, conf, s.eventBuilder) + conf, errConf := s.SubscriptionEventConfiguration(input) + if errConf != nil { + return errConf + } + for _, fn := range s.hooks.SubscriptionOnStart.Handlers { + err = fn(ctx, conf, s.eventBuilder) if err != nil { return err } }router/pkg/pubsub/datasource/subscription_event_updater.go (1)
103-120: Stop clobbering hook errorsInside
updateSubscriptionthe loop assignserr = hooks[i](...)every iteration. If an earlier hook returns an error and a later hook succeeds, the final assignment overwrites the failure and the subscription stays open. That is a functional regression—any hook signalling an error should still force the close path. Capture the first non-nil error (and stop invoking the remaining hooks) so we preserve the contract.- var err error - for i := range hooks { - events, err = hooks[i](ctx, s.subscriptionEventConfiguration, s.eventBuilder, events) + var err error + for i := range hooks { + var hookErr error + events, hookErr = hooks[i](ctx, s.subscriptionEventConfiguration, s.eventBuilder, events) events = slices.DeleteFunc(events, func(event StreamEvent) bool { return event == nil }) + if hookErr != nil { + err = hookErr + break + } }
🧹 Nitpick comments (2)
router/pkg/pubsub/datasource/pubsubprovider.go (1)
44-62: OnPublishEvents .Handlers migration looks correct.Loop and length checks now target .Handlers; behavior preserved.
Consider guarding p.Logger in error paths or defaulting to zap.NewNop() in NewPubSubProvider to avoid a possible nil deref if a nil logger is ever passed.
Also applies to: 93-105
router/pkg/config/config.schema.json (1)
2313-2330: Add duration validation for handler_timeout to match schema conventions.Many duration fields here use either "duration" with min or "format": "go-duration". handler_timeout currently lacks validation.
Apply this diff:
"handler_timeout": { "type": "string", "description": "The amount of time that OnReceiveEvents handlers can run in total for a single batch of events. Specify as a duration string (e.g., '5s', '1m', '500ms').", - "default": "5s" + "default": "5s", + "duration": { + "minimum": "1ms" + } }Optional: if "0" should be disallowed (router defaults to 5s on 0), keep the minimum > 0s as above; if "0" should disable timeout, document it explicitly and accept "0s".
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
router-tests/modules/stream_receive_test.go(3 hunks)router/core/factoryresolver.go(3 hunks)router/core/router.go(3 hunks)router/core/router_config.go(1 hunks)router/core/subscriptions_modules.go(6 hunks)router/demo.config.yaml(1 hunks)router/pkg/config/config.go(1 hunks)router/pkg/config/config.schema.json(1 hunks)router/pkg/config/fixtures/full.yaml(1 hunks)router/pkg/config/testdata/config_defaults.json(1 hunks)router/pkg/config/testdata/config_full.json(1 hunks)router/pkg/pubsub/datasource/hooks.go(2 hunks)router/pkg/pubsub/datasource/pubsubprovider.go(2 hunks)router/pkg/pubsub/datasource/pubsubprovider_test.go(12 hunks)router/pkg/pubsub/datasource/subscription_datasource.go(2 hunks)router/pkg/pubsub/datasource/subscription_datasource_test.go(7 hunks)router/pkg/pubsub/datasource/subscription_event_updater.go(4 hunks)router/pkg/pubsub/datasource/subscription_event_updater_test.go(18 hunks)
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.
Applied to files:
router/pkg/config/config.gorouter/core/router.gorouter/pkg/pubsub/datasource/subscription_event_updater.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router/core/router_config.gorouter/pkg/pubsub/datasource/subscription_event_updater_test.gorouter/core/router.gorouter-tests/modules/stream_receive_test.gorouter/pkg/pubsub/datasource/subscription_event_updater.go
📚 Learning: 2025-08-20T10:08:17.857Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2155
File: router/core/router.go:1857-1866
Timestamp: 2025-08-20T10:08:17.857Z
Learning: router/pkg/config/config.schema.json forbids null values for traffic_shaping.subgraphs: additionalProperties references $defs.traffic_shaping_subgraph_request_rule with type "object". Therefore, in core.NewSubgraphTransportOptions, dereferencing each subgraph rule pointer is safe under schema-validated configs, and a nil-check is unnecessary.
Applied to files:
router/pkg/config/config.schema.jsonrouter/pkg/config/testdata/config_full.json
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router/pkg/pubsub/datasource/subscription_event_updater_test.gorouter-tests/modules/stream_receive_test.go
📚 Learning: 2025-07-29T08:19:55.720Z
Learnt from: Noroth
Repo: wundergraph/cosmo PR: 2088
File: demo/pkg/subgraphs/projects/src/main_test.go:0-0
Timestamp: 2025-07-29T08:19:55.720Z
Learning: In Go testing, t.Fatal, t.FailNow, t.Skip* and similar methods cannot be called from goroutines other than the main test goroutine, as they will cause a runtime panic. Use channels, t.Error*, or other synchronization mechanisms to communicate errors from goroutines back to the main test thread.
Applied to files:
router-tests/modules/stream_receive_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_push_image (nonroot)
- GitHub Check: image_scan
- GitHub Check: build_test
- GitHub Check: image_scan (nonroot)
- GitHub Check: integration_test (./telemetry)
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: integration_test (./events)
🔇 Additional comments (7)
router/pkg/config/testdata/config_defaults.json (1)
300-304: Defaults align with router wiring.MaxConcurrentHandlers=100 and HandlerTimeout serialized to ns are consistent with code defaults.
router/pkg/pubsub/datasource/subscription_datasource.go (1)
49-56: Good: contextualized logger for updater.Adds clear fields (component/provider_id/provider_type/field_name) and passes per-request logger.
router/pkg/config/testdata/config_full.json (1)
646-649: OnReceiveEvents fixture looks consistent.Names and default values match schema and router defaults.
router/pkg/config/fixtures/full.yaml (1)
334-336: Fixture matches new schema (names and types).String duration and concurrency key look correct.
router/core/router.go (3)
256-262: Sane defaults for onReceiveEvents.Zero-value guard sets maxConcurrentHandlers=100 and timeout=5s.
686-696: Updated hook registration to new handler slices.Migration to .handlers is correct for onStart/onPublishEvents/onReceiveEvents.
2145-2147: Config wiring maps to new nested fields.onReceiveEvents.{MaxConcurrentHandlers,HandlerTimeout} correctly populate runtime config.
alepane21
left a comment
There was a problem hiding this comment.
I like what you did here, good work! I found some small issues.
Checklist
Motivation and Context
I use an synonym example here to demonstrate what this change is all about, which hopefully accurately describes everything.
The Great Ice Cream Delivery Crisis: A Timeout Tale
Imagine an ice cream factory (Kafka) that's churning out delicious frozen treats (events) faster than you can say "brain freeze." These ice creams need to get to hungry customers (subscription clients) via our fleet of deliveryman (the router's pubsub system).
Each deliveryman has to pass through a traffic light (a users potentially long running hook) on their way to the customer. Usually it's green and they zoom right through. But sometimes? That light turns red. Really, REALLY red. Like, "stuck-for-5-minutes-questioning-your-life-choices" red.
Let's assume you have 3 deliveryman and 3 customers. The first two deliveryman make it over the green light, the third one has to wait in front of the red light (hook is blocking for whatever reason). In the old implementation the two deliveryman will wait for the third deliveryman before picking up new ice-cream (events) from the ice cream factory (Kafka). This makes the ice cream delivery as slow as the weakest of the three.
Solution idea
After this change the first two deliveryman will not wait for long (until timeout exceeds) for their third colleague before they will pick up new ice cream (new events from broker), antisocially but efficiently so.
Why even use a timeout
I need to break out of the ice cream example for a bit here.
The reason to use a timeout instead of simply moving on immediately when a semaphore slot is free is because I try to honor event ordering and only break it if we exceed a timeout. We cannot stop a running goroutine, we can only abandon it. But it will still eventually progress and update a clients subscription. This could mean an event #2 could make it fast to a subscriber than event #1, if the hook processing it takes too long. With a timeout waiting for long running hooks we can somewhat mitigate this to a degree.
The timeout is configurable via config:
events.subscription_hooks.on_receive_events.handler_timeoutAlways honor semaphore limit
If all deliveryman are stuck at the traffic light then we will not pick up new ice cream from the factory. That was a problem before this change and is still one after this change. But the only way to make it work is to spawn new deliveryman even if the limit (semaphore limit) is already exceeded. I did not opt for this and instead always honor the semaphore limit. There will never be more hook goroutines, ÄH deliveryman, than the configured semaphore limit. Else we would potentially run into a situation, where we have no control over the amount of routines running at a time on the router. If the limit is too low, a user can set it higher via configuration
events.subscription_hooks.on_receive_events.max_concurrent_handlers.Let the hook developers know their time is over
Okay that sounds more dramatic than it really is. What I mean by that is we need to let hook developers know that their hook execution must stop now. We as the pubsub system cannot stop the routine from the outside, and have no control of the inside of the hook. But we can let the hook know to stop voluntarily via timeout contexts. The hooks context now has
ctx.Context()which is timed with the routers timeout mechanism. Once we stop waiting for the routine, thectx.Context()is also cancelled. This gives hook developers the option to stop their routines and avoid running abandoned in the background.Technical changes
componentkey for better logging (unrelated, but small change)ctx.Context()events.subscription_hooks.on_receive_events.max_concurrent_handlers(new path)events.subscription_hooks.on_receive_events.handler_timeoutSummary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests