Skip to content

feat: batch and stream hooks#2087

Merged
dkorittki merged 198 commits intotopic/streams-v1from
ale/eng-7601-add-streambatcheventhook-and-streampublisheventhook
Oct 10, 2025
Merged

feat: batch and stream hooks#2087
dkorittki merged 198 commits intotopic/streams-v1from
ale/eng-7601-add-streambatcheventhook-and-streampublisheventhook

Conversation

@alepane21
Copy link
Copy Markdown
Contributor

@alepane21 alepane21 commented Jul 28, 2025

Summary by CodeRabbit

  • New Features

    • Added support for custom event hooks during stream publishing and batch processing, enabling advanced filtering and modification of events in real time.
    • Introduced new modules and utilities for enhanced Kafka and GraphQL subscription testing, including configurable batch and publish hooks.
  • Refactor

    • Unified and extended hook management for pub/sub providers, allowing more flexible and centralized event handling.
    • Improved configuration and provider management for Kafka, NATS, and Redis integrations, simplifying adapter usage and setup.
    • Changed internal provider collections from slices to maps for efficient access and management.
  • Bug Fixes

    • Enhanced error handling and type safety for event publishing and subscription flows across all supported providers.
    • Added error propagation and subscription closure handling in event update flows.
    • Improved logging and error messages for pub/sub operations.
  • Tests

    • Expanded and updated test coverage for batch and publish hooks, and improved test utilities for Kafka, NATS, and Redis.
    • Replaced local Kafka and Redis test helpers with centralized utility functions for consistency.
    • Updated mocks to align with new interfaces and method signatures, removing deprecated mocks.
  • Chores

    • Removed unused mock implementations and streamlined mock usage across the codebase.
    • Centralized Kafka and Redis event helper functions into a shared package for reuse.

How to use the new hook?

The hooks implemented in this PR are StreamBatchEventHook and StreamPublishEventHook.
StreamBatchEventHook is defined here:

type StreamBatchEventHook interface {

StreamPublishEventHook is defined here:

type StreamPublishEventHook interface {

Demo StreamBatchEventHook

Subscription

On the demo graph, we will create a module that filters out messages based on the kafka header

employeeUpdatedMyKafka(employeeID: $employeeID)

Module code that implement the new hook

const myModuleID = "streamBatchModule"

type StreamBatchModule struct {}

func (m *StreamBatchModule) OnStreamEvents(ctx core.StreamBatchEventHookContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
        if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdatedMyKafka" {
	        return events, nil
        }
        newEvents := []datasource.StreamEvent{}
        for _, event := range events {
	        evt, ok := event.(*kafka.Event)
	        if !ok {
		        continue
	        }
	        if string(evt.Headers["destination"]) != "external-clients" {
		        continue
	        }
	        newEvents = append(newEvents, event)
        }
        
        return newEvents, nil
}

func (m *StreamBatchModule) Module() core.ModuleInfo {
	return core.ModuleInfo{
		// This is the ID of your module, it must be unique
		ID: myModuleID,
		// The priority of your module, lower numbers are executed first
		Priority: 1,
		New: func() core.Module {
			return &StreamBatchModule{}
		},
	}
}

Demo StreamPublishEventHook

Mutation

On the demo graph, we will create a module that will add the $employeeId value in a kafka header.

updateEmployeeMyKafka(employeeID: $employeeId, update: $update)

Module code that implement the new hook

const myModuleID = "publishModule"

type PublishModule struct {}

func (m *PublishModule) OnPublishEvents(ctx core.StreamPublishEventHookContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
        if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployeeMyKafka" {
	        return events, nil
        }
        
        employeeID := ctx.RequestContext().Operation().Variables().GetInt("employeeID")
        
        newEvents := []datasource.StreamEvent{}
        for _, event := range events {
	        evt, ok := event.(*kafka.Event)
	        if !ok {
		        continue
	        }
	        if evt.Headers == nil {
		        evt.Headers = map[string][]byte{}
	        }
	        evt.Headers["x-employee-id"] = []byte(strconv.Itoa(employeeID))
	        newEvents = append(newEvents, event)
        }
        return newEvents, nil
}

func (m *PublishModule) Module() core.ModuleInfo {
	return core.ModuleInfo{
		// This is the ID of your module, it must be unique
		ID: myModuleID,
		// The priority of your module, lower numbers are executed first
		Priority: 1,
		New: func() core.Module {
			return &PublishModule{}
		},
	}
}

Checklist

alepane21 and others added 30 commits July 10, 2025 11:40
…subscription and not going to other client that subscribed the same subscription
Copy link
Copy Markdown
Contributor

@dkorittki dkorittki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed all non-test code first because of the size of this PR. I think it makes sense to discuss this first, then tests might have to be adjusted second.

I have a general question regarding the current method of pushing a message batch to brokers. In this approach we abort on first problems, with parts of messages being sent to the broker while others not. Did this change in the way EDFS did it before? Generally I would think batch events should be sent transactionally but I am lacking the historical context here. Just wanted to bring this topic up for discussion.

Comment thread router/core/subscriptions_modules.go Outdated
Comment thread router/core/subscriptions_modules.go Outdated
Comment thread router/pkg/pubsub/datasource/datasource.go Outdated
Comment thread router/pkg/pubsub/datasource/factory.go
Comment thread router/pkg/pubsub/datasource/subscription_event_updater.go
Comment thread router/pkg/pubsub/kafka/engine_datasource.go
Comment thread router/pkg/pubsub/nats/adapter.go Outdated
Comment thread router/pkg/pubsub/nats/adapter.go Outdated
Comment thread router/pkg/pubsub/nats/adapter.go
Comment thread router/pkg/pubsub/redis/adapter.go Outdated
@dkorittki
Copy link
Copy Markdown
Contributor

I believe that in the ADR the StreamHookError type is not whats in the code:

In ADR

// StreamHookError is used to customize the error messages and the behavior
type StreamHookError struct {
    HttpError core.HttpError
    CloseSubscription bool
}

In Code

type StreamHookError struct {
	err        error
	message    string
	statusCode int
	code       string
}

File: router/core/subscriptions_modules.go

@dkorittki dkorittki requested a review from endigma as a code owner October 1, 2025 14:19
Comment thread router-tests/events/utils.go Outdated
Comment thread router-tests/modules/stream_publish_test.go
Comment thread router-tests/modules/stream_publish_test.go
requestLog2 := xEnv.Observer().FilterMessage("error applying publish event hooks")
assert.Len(t, requestLog2.All(), 1)

require.Len(t, records, 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check the response body here? Like you did on the nats test

Comment thread router-tests/modules/stream_publish_test.go
Comment thread router-tests/modules/stream_receive_test.go
Comment thread router/pkg/pubsub/datasource/pubsubprovider_test.go Outdated
Comment thread router/pkg/pubsub/datasource/subscription_event_updater_test.go
Comment thread router/pkg/pubsub/kafka/engine_datasource.go
Comment thread router/pkg/pubsub/redis/engine_datasource.go Outdated
@dkorittki dkorittki changed the base branch from ale/eng-7600-add-subscriptiononstarthandler to topic/streams-v1 October 9, 2025 11:40
…thook-and-streampublisheventhook

Resolved in favor of ale/eng-7601-add-streambatcheventhook-and-streampublisheventhook
because this branch is based on topic/streams-v1 but changed code afterwards.
The reason this is a merge conflict is because on topic/streams-v1 a squashe merge
commit let it look like these changes are newer than the ones on this branch.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants