fix: improve subscription latency by executing handlers async#2288
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR introduces a new subscription hooks configuration system with Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes The changes comprise straightforward configuration additions across multiple files following established patterns, balanced against a single complex implementation in Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (10)
router-tests/modules/stream_publish_test.go (10)
69-73: Duplicate of earlier approval for hook wiring.
97-98: Duplicate of earlier approval for WithSubscriptionHooks.
128-131: Duplicate of earlier approval for hook wiring.
147-148: Duplicate of earlier approval for WithSubscriptionHooks.
179-182: Duplicate of earlier approval for hook wiring.
198-199: Duplicate of earlier approval for WithSubscriptionHooks.
239-242: Duplicate of earlier approval for hook wiring.
258-259: Duplicate of earlier approval for WithSubscriptionHooks.
289-292: Duplicate of earlier approval for hook wiring.
326-327: Duplicate of earlier approval for WithSubscriptionHooks.
🧹 Nitpick comments (6)
router/pkg/pubsub/datasource/subscription_event_updater_test.go (1)
625-629: Avoid fixed sleeps in log assertions; use eventual assertionSleeping 10ms can flake under load/CI. Poll until the expected log appears within a timeout.
Replace the sleep + immediate check with an eventual assertion:
-// log error messages for hooks are written async, we need to wait for them to be written -time.Sleep(10 * time.Millisecond) - -msgs := logObserver.FilterMessageSnippet("some handlers have thrown an error").TakeAll() -assert.Equal(t, 1, len(msgs)) +assert.Eventually(t, func() bool { + return len(logObserver.FilterMessageSnippet("some handlers have thrown an error").TakeAll()) == 1 +}, time.Second, 10*time.Millisecond, "expected one deduplicated error log")router-tests/modules/stream_receive_test.go (2)
28-30: Avoid duplicating the default in testsHardcoding 100 couples tests to config defaults. Consider reading from a single source (e.g., a test helper or exported default) to avoid drift if the default changes.
50-55: Good: explicit wiring via core.WithSubscriptionHooksThis keeps tests clear. To reduce repetition, consider a small helper that returns common RouterOptions including WithSubscriptionHooks.
Also applies to: 151-155, 271-272, 410-411, 501-502
router/core/router.go (1)
2125-2129: Consider explicit validation for consistency, but immediate risk is mitigated downstreamThe downstream code already guards against 0/negative values. At
router/pkg/pubsub/datasource/subscription_event_updater.go:37, the value is clamped:maxConcurrentHandlers := max(s.hooks.MaxConcurrentOnReceiveHandlers, 1). This prevents undefined behavior without additional checks at the option level.However, explicit validation at the boundary aligns with patterns established elsewhere—for example,
router/core/graph_server.go:312–317validatesMaxConcurrentRoutines <= 0and rejects invalid values. If you'd like to adopt this pattern for consistency and clarity, the suggested clamping approach is reasonable. Otherwise, the current implementation is safe.router/core/factoryresolver.go (1)
504-508: Good wiring of max concurrency; consider clamping at the boundary.Looks correct. To avoid passing zero/negative values downstream, clamp here (e.g., default to 1 when <=0) so all call sites inherit sane limits.
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
44-48: Optional: avoid potential backpressure on errCh when many hooks error.If multiple hooks can error per subscription, errCh capacity len(subscriptions) might be tight. Consider buffering by len(subscriptions)*len(s.hooks.OnReceiveEvents) or start the deduper before launching workers.
Also applies to: 56-61
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
router-tests/modules/stream_publish_test.go(12 hunks)router-tests/modules/stream_receive_test.go(13 hunks)router-tests/modules/streams_hooks_combined_test.go(2 hunks)router/core/factoryresolver.go(1 hunks)router/core/router.go(1 hunks)router/core/router_config.go(1 hunks)router/core/supervisor_instance.go(2 hunks)router/demo.config.yaml(1 hunks)router/pkg/config/config.go(1 hunks)router/pkg/config/config.schema.json(1 hunks)router/pkg/config/fixtures/full.yaml(1 hunks)router/pkg/config/testdata/config_defaults.json(1 hunks)router/pkg/config/testdata/config_full.json(1 hunks)router/pkg/pubsub/datasource/hooks.go(1 hunks)router/pkg/pubsub/datasource/subscription_event_updater.go(4 hunks)router/pkg/pubsub/datasource/subscription_event_updater_test.go(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: integration_test (./events)
- GitHub Check: integration_test (./telemetry)
- GitHub Check: build_test
🔇 Additional comments (18)
router/demo.config.yaml (1)
22-24: Demo config advertises the new knob.Thanks for surfacing the concurrency limit in the demo config so users discovering the feature see the default right away.
router/pkg/config/config.schema.json (1)
2277-2287: JSON schema covers subscription hooks.The schema entry looks consistent—minimum validation and descriptive text match the runtime behaviour.
router/pkg/config/testdata/config_defaults.json (1)
292-295: Defaults fixture includes the new limit.Good to see the defaults JSON updated so regression tests catch future drift.
router/pkg/config/testdata/config_full.json (1)
638-641: Full fixture exercises the knob.Including the populated setting in the “full” sample keeps documentation-style fixtures accurate.
router/pkg/config/fixtures/full.yaml (1)
324-329: YAML fixture mirrors the new setting.Thanks for updating the comprehensive fixture so examples remain aligned with the JSON defaults.
router/core/router_config.go (1)
30-34: Clamp zero max-concurrency.When a user omits
events.subscription_hooks,MaxConcurrentEventReceiveHandlerswill still deserialize as0. The new async updater limits throughput with this value, and a zero-capacity semaphore will block every handler. Please clamp to at least1(either right here when we store it or in the updater before building the semaphore) so existing configs keep working. If we already handle this elsewhere, let me know where.router/core/supervisor_instance.go (1)
255-256: Router options now receive hook limits.Glad to see the supervisor wiring pass the new SubscriptionHooks struct into the router—this keeps the option plumbing uniform.
router/pkg/pubsub/datasource/hooks.go (1)
17-21: Hook struct carries concurrency limit.Extending the hooks struct is the right spot; downstream providers can now respect the configured cap.
router-tests/modules/streams_hooks_combined_test.go (1)
37-41: Wiring looks goodConfig block and core.WithSubscriptionHooks usage are correct and keep tests explicit.
Also applies to: 86-86
router/pkg/pubsub/datasource/subscription_event_updater_test.go (1)
305-320: Hook pipeline tests read cleanThe renamed applyReceiveEventHooks tests cover happy path, chaining, and short‑circuit on error well.
Also applies to: 322-344, 346-367, 369-425, 427-478
router-tests/modules/stream_receive_test.go (2)
5-5: LGTM on importsAdditional fmt and sync/atomic are appropriate for the new tests.
Also applies to: 7-7
607-621: No refactoring required; code aligns with project's Go 1.25 requirementThe project declares
go 1.25in all go.mod files, which fully supports the Go 1.22+ features in the code (for i := range 3and per-iteration loop variable capture). Since the minimum Go version is 1.25—not 1.21—there is no need to make the code "1.21-safe." The code is already compliant with the project's declared toolchain.Likely an incorrect or invalid review comment.
router/pkg/config/config.go (1)
642-649: The review's technical claim is incorrect—envDefault without an env tag is properly applied by env.v11The TestConfigSlicesHaveDefaults test explicitly verifies that envDefault works without an env tag: fields like
ExporterandTemporalityhave onlyenvDefault(noenvtag), and the test assertions confirm the defaults are applied correctly.MaxConcurrentEventReceiveHandlers follows the same proven pattern used throughout the codebase (e.g., BatchTimeout, ExportTimeout, HTTPPath, HeaderName all use envDefault-only). According to the env.v11 library behavior, envDefault serves as a fallback when no environment variable is found; since fields without an env tag don't trigger an environment variable lookup in the first place, envDefault naturally applies.
The field is correctly designed and will receive the default value of 100 when the YAML omits subscription_hooks. The suggested fixes—adding an env tag or post-unmarshal normalization—are unnecessary based on how the library actually works. If you want to clarify semantics or improve test coverage, that's a separate concern, but the mechanism is sound.
Likely an incorrect or invalid review comment.
router-tests/modules/stream_publish_test.go (3)
31-35: LGTM: tests pass subscription hooks through config.Good to see hooks wired explicitly in tests.
47-48: LGTM: router gets SubscriptionHooks via option.
33-34: No build issue—constant is properly defined at package scope.The constant
defaultMaxConcurrentEventReceiveHandlersis defined inrouter-tests/modules/stream_receive_test.go(lines 28–30) at package level. In Go, unexported symbols (lowercase names) defined in one file are accessible throughout the entire package, includingstream_publish_test.go. Both files compile as part of the samerouter-tests/modulespackage, so no visibility or build break will occur.Likely an incorrect or invalid review comment.
router/pkg/pubsub/datasource/subscription_event_updater.go (2)
140-165: Nice: error dedup with useful context.Aggregating identical errors and logging counts with provider metadata is solid.
75-97: The review comment's core premise is incorrect:applyReceiveEventHooksis actively used in tests.The helper is called at 5 locations in the test file (lines 316, 340, 362, 396, 455), contradicting the claim that it's "currently unused." While the semantic difference in error handling is real—
applyReceiveEventHooksstops on first error whereasupdateSubscription's inline loop continues on error—the recommendation to delete the helper (Option A) is unsound because removing it would break the existing test coverage.The semantic divergence concern is valid, but should be addressed differently: either refactor the helper to match inline behavior, or keep it as a reusable abstraction for contexts requiring fail-fast semantics (as tests demonstrate).
Likely an incorrect or invalid review comment.
alepane21
left a comment
There was a problem hiding this comment.
Looks good to me! Maybe there is still some margin for improvement in the test, but I would say that we invested enough in it.
Good job!
Summary by CodeRabbit
New Features
max_concurrent_event_receive_handlersto control concurrent subscription event receive processing.Bug Fixes
Tests
Checklist
This is a change for Cosmo Streams.
Changed the execution of
OnReceiveHandlerfrom sequentiel to asynchronous. The number of concurrent handlers running is limited by a semaphore, which is confurable via a new config parameterevents.subscription_hooks.max_concurrent_event_receive_handlers. The default is max 100 concurrent handlers.I found a problem unrelated to this change during development: If all handlers return an error, all errors get logged. When having
xactive subscriptions and the hook errors for each, which I suppose could be quiet common, than we logxamount of error logs. I came with a error message deduplication logic: Based on the errorsError()we deduplicate logs while handlers are running and once all handlers have finished we log them.The behaviour of using handlers hasn't changed. From a users POV he can use the handler as he has before.