Skip to content

feat: distinguish between read / write events (Cosmo Streams)#2304

Merged
dkorittki merged 9 commits intotopic/streams-v1from
dominik/eng-8358-distinguish-between-readonly-events-and-readwrite-events
Oct 30, 2025
Merged

feat: distinguish between read / write events (Cosmo Streams)#2304
dkorittki merged 9 commits intotopic/streams-v1from
dominik/eng-8358-distinguish-between-readonly-events-and-readwrite-events

Conversation

@dkorittki
Copy link
Copy Markdown
Contributor

@dkorittki dkorittki commented Oct 26, 2025

Summary by CodeRabbit

  • Refactor
    • Internal event handling system refactored to support mutable event operations throughout the pub/sub pipeline, enabling in-flight event transformation in subscription hooks and adapters while maintaining consistency across all supported transports.

Checklist

Motivation

This pull is a draft to discuss a way of avoiding lots of allocations being done whenever we call the OnReceiveEvents hook. We do this to isolate event changes between subscribers on that hook, so a change in a hook from one subscriber does not affect the other. The downside is we need to allocate memory for every event for every subscriber. The question is: How many hooks will actually modify the event slice items? Because only then do we need a deep copy of them. If the hooks don't do that we do lots of allocation for nothing.

This PR for now provides the code changes, without touching tests yet. This is only to show the idea and once thats settling, I will do the corrections and adjust tests.

The idea

Don't copy events before calling OnReceiveEvents hook. Instead make the user aware via type/method names, godoc and cosmo docs he needs to clone events when he wants to modify them to prevent data races. He can use the original events object to read events (and avoid allocations along the way) as long as he doesn't touch the actual event items themself.

The design

Thx to @alepane21 for this idea!

Instead of providing events as a slice to the hook, we provide a new type datasource.StreamEvents to hooks:

OnPublishEvents(
    ctx core.StreamPublishEventHandlerContext,
    events datasource.StreamEvents)
(datasource.StreamEvents, error)

You cannot access the slice directly, because we hide the slice in a private struct field.

func (m *CosmoStreamsModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
    // this throws a compile time error
    for i = range events { // cant range over 'events'
        events[i] = myNewEvent // 'events' not indexable
    }
}

To access it you have two options:

events.All()

allocation-free iterator without the option to override slice elements.
Good for getting read access to events.

func (m *CosmoStreamsModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
    for i, evt := range events.All() {
        evt = &kafka.Event{} // ineff assignment
        events.All()[i] = &kafka.Event{} // compile-time error, not indexable
        data := evt.GetData() // returns a copy of data --> thread safe
    }
}

To modify the events in a thread safe way, a hook user has to do this

Note

Working on getting this easier and more straight forward

func (m *CosmoStreamsModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
    newEvents := make([]datasource.StreamEvent, 0, events.Len())

    for _, evt := range events.All() {
            newEvt := evt.GetUnsafeEvent().Clone()
	    newEvt.SetData([]byte("new event data"))
	    newEvents = append(newEvents, kafka.NewEvent(newEvt.(*kafka.UnsafeEvent)))
    }

    return datasource.NewStreamEvents(newEvents), nil
}

events.UnsafeStreamEvents():

There is a way to access the underlying slice directly for those who really want to.
For example they need indexable read access to the slice

func (m *CosmoStreamsModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
    unsafeEvents := events.UnsafeStreamEvents()
    iReallyNeedTheSecondElement := unsafeEvents[2]
}

The name implies that doing this is dangerous. It should lead to the user checking godocs and see that this can cause race conditions, if data is modified.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Oct 26, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The PR introduces mutable event handling and collection-based APIs across the pub/sub system. Events are now wrapped via MutableEvent to enable in-place mutations, and event handler interfaces changed from slice-based to collection-based signatures using a new datasource.StreamEvents type. Changes propagate through demo subgraphs, router core, test modules, and all pub/sub adapter implementations.

Changes

Cohort / File(s) Summary
Core Datasource Types
router/pkg/pubsub/datasource/provider.go
Introduced StreamEvents collection wrapper with iterator methods (All(), Len(), Unsafe()) and MutableStreamEvent interface; updated StreamEvent.Clone() to return MutableStreamEvent.
Demo Subgraph Event Publishing
demo/pkg/subgraphs/availability/subgraph/schema.resolvers.go, demo/pkg/subgraphs/mood/subgraph/schema.resolvers.go
Replaced nats.Event with nats.MutableEvent in two publish calls per file without changing payload or control flow.
Kafka Pub/Sub Implementation
router/pkg/pubsub/kafka/engine_datasource.go, router/pkg/pubsub/kafka/adapter.go, router/pkg/pubsub/kafka/engine_datasource_factory.go, router/pkg/pubsub/kafka/engine_datasource_test.go
Introduced Event wrapper and MutableEvent with accessors; event emission now wraps data inside Event{evt: \*MutableEvent}; Publish path uses Clone().(*MutableEvent) for deserialization.
NATS Pub/Sub Implementation
router/pkg/pubsub/nats/engine_datasource.go, router/pkg/pubsub/nats/adapter.go, router/pkg/pubsub/nats/engine_datasource_factory.go, router/pkg/pubsub/nats/engine_datasource_test.go
Introduced Event wrapper and MutableEvent with delegate methods; repurposed publishData.Event to hold MutableEvent; updated subscription and publish paths to wrap events via Event{evt: \*MutableEvent}.
Redis Pub/Sub Implementation
router/pkg/pubsub/redis/engine_datasource.go, router/pkg/pubsub/redis/adapter.go, router/pkg/pubsub/redis/engine_datasource_factory.go, router/pkg/pubsub/redis/engine_datasource_test.go
Introduced Event wrapper and MutableEvent with data storage; updated Subscribe to emit Event{evt: \*MutableEvent} and Publish to use Clone().(*MutableEvent) for extraction.
Router Core Hook Types
router/core/subscriptions_modules.go
Introduced MutableEngineEvent type and updated EngineEvent.data field from []byte to MutableEngineEvent; changed StreamReceiveEventHandler and StreamPublishEventHandler interfaces to accept/return datasource.StreamEvents instead of slices.
Router Configuration
router/core/router_config.go
Updated subscriptionHooks.onPublishEvents and subscriptionHooks.onReceiveEvents parameter and return types from []datasource.StreamEvent to datasource.StreamEvents.
Test Module: Stream Publish
router-tests/modules/stream-publish/module.go, router-tests/modules/stream_publish_test.go
Changed PublishModule.Callback and OnPublishEvents signatures from ([]datasource.StreamEvent) to (datasource.StreamEvents); updated test callbacks to use events.All(), events.Clone(), and return datasource.NewStreamEvents().
Test Module: Stream Receive
router-tests/modules/stream-receive/module.go, router-tests/modules/stream_receive_test.go
Changed StreamReceiveModule.Callback and OnReceiveEvents signatures from ([]datasource.StreamEvent) to (datasource.StreamEvents); added Logger assignment in Provision; updated callbacks to iterate via events.All() and return via NewStreamEvents().
Integration Tests
router-tests/modules/start_subscription_test.go, router-tests/modules/streams_hooks_combined_test.go
Replaced event type assertions from kafka.Event/core.EngineEvent to kafka.MutableEvent/core.MutableEngineEvent in WriteEvent calls; updated combined hook test callbacks to clone and mutate events via MutableEvent.
Datasource Test Infrastructure
router/pkg/pubsub/datasource/pubsubprovider_test.go, router/pkg/pubsub/datasource/subscription_event_updater.go, router/pkg/pubsub/datasource/subscription_event_updater_test.go
Introduced mutableTestEvent wrapper type in tests; removed copyEvents helper and direct event copying; updated tests to reflect hook-based transformations and passthrough behavior with new datasource.StreamEvents interface.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

  • Areas requiring extra attention:
    • Event wrapper/delegation patterns across all four pub/sub adapters (Kafka, NATS, Redis) — verify Event{evt: *MutableEvent} construction is consistent and Clone() implementations are correct
    • Collection-based API migration from slices to datasource.StreamEvents — ensure all callers properly use All(), Len(), Clone() and NewStreamEvents() constructor
    • Hook signature changes (StreamReceiveEventHandler, StreamPublishEventHandler) propagated through core, test modules, and router configuration — verify all implementations match the new interface
    • Test event setup changes (mutableTestEvent wrapper in provider and subscription tests) — validate test data consistency and expected behavior alignments
    • Mutability semantics in combined hooks test — ensure proper cloning and isolation between hook transformations

Possibly related PRs


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Oct 26, 2025

Router image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-58839b8453a2ea757d63acc09b7c399b38b482c3

Copy link
Copy Markdown
Contributor Author

@dkorittki dkorittki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @alepane21 ! We could do some small tweaks here and there but I do like this idea. I added some suggestions on how we could streamline the interface even further

Comment thread router/pkg/pubsub/kafka/engine_datasource.go Outdated
Comment thread router/pkg/pubsub/kafka/engine_datasource.go
Comment thread router/pkg/pubsub/kafka/engine_datasource.go Outdated
Comment thread router/pkg/pubsub/kafka/engine_datasource.go Outdated
Comment thread router/core/subscriptions_modules.go Outdated
@dkorittki dkorittki changed the title feat: distinguish between read / write events feat: distinguish between read / write events (Cosmo Streams) Oct 28, 2025
@dkorittki
Copy link
Copy Markdown
Contributor Author

@coderabbitai help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Oct 29, 2025

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

CodeRabbit commands

These commands are invoked using PR/Issue comments.

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai evaluate custom pre-merge check --instructions <custom-checks-instructions> --name <custom-checks-title> [--mode <error|warning>] to validate the custom pre-merge checks instructions. Defaults to error when --mode is omitted.
    • @coderabbitai ignore pre-merge checks to override pre-merge checks and get an approval on PR.
    • @coderabbitai run pre-merge checks to run pre-merge checks on the pull request.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve to resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit configuration file (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, support, documentation and community

  • Visit our status page to check the current availability of CodeRabbit.
  • Create a ticket on our support page for assistance with any issues or questions.
  • Visit our documentation site for detailed information on how to use CodeRabbit.
  • Join our Discord community to connect with other users and get help from the community.
  • Follow us on X/Twitter for updates and announcements.

@dkorittki
Copy link
Copy Markdown
Contributor Author

@coderabbitai summary

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Oct 29, 2025

✅ Actions performed

Summary regeneration triggered.

@dkorittki dkorittki marked this pull request as ready for review October 29, 2025 15:20
@dkorittki dkorittki requested review from JivusAyrus and removed request for a team, Noroth, StarpTech and jensneuse October 29, 2025 15:20
@dkorittki dkorittki removed request for a team, JivusAyrus, devsergiy and endigma October 29, 2025 15:20
Comment thread router/core/subscriptions_modules.go Outdated
Comment thread router/pkg/pubsub/datasource/provider.go
Comment thread router/pkg/pubsub/kafka/engine_datasource.go Outdated
Comment thread router/pkg/pubsub/redis/engine_datasource.go Outdated
Comment thread router/core/subscriptions_modules.go Outdated
Comment thread router/core/subscriptions_modules.go
@dkorittki dkorittki merged commit 4c7687d into topic/streams-v1 Oct 30, 2025
34 of 35 checks passed
@dkorittki dkorittki deleted the dominik/eng-8358-distinguish-between-readonly-events-and-readwrite-events branch October 30, 2025 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants