Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 374 additions & 2 deletions router-tests/modules/stream_receive_test.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions router/core/factoryresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,10 @@ func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nod
l.resolver.InstanceData().HostName,
l.resolver.InstanceData().ListenAddress,
pubsub_datasource.Hooks{
SubscriptionOnStart: subscriptionOnStartFns,
OnReceiveEvents: onReceiveEventsFns,
OnPublishEvents: onPublishEventsFns,
SubscriptionOnStart: subscriptionOnStartFns,
OnReceiveEvents: onReceiveEventsFns,
OnPublishEvents: onPublishEventsFns,
MaxConcurrentOnReceiveHandlers: l.subscriptionHooks.maxConcurrentOnReceiveHooks,
},
)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func NewRouter(opts ...Option) (*Router, error) {
r.metricConfig = rmetric.DefaultConfig(Version)
}

// Default value for maxConcurrentOnReceiveHooks
if r.subscriptionHooks.maxConcurrentOnReceiveHooks == 0 {
r.subscriptionHooks.maxConcurrentOnReceiveHooks = 100
}

if r.corsOptions == nil {
r.corsOptions = CorsDefaultOptions()
}
Expand Down Expand Up @@ -2122,6 +2127,12 @@ func WithDemoMode(demoMode bool) Option {
}
}

func WithSubscriptionHooks(cfg config.SubscriptionHooksConfiguration) Option {
return func(r *Router) {
r.subscriptionHooks.maxConcurrentOnReceiveHooks = cfg.MaxConcurrentEventReceiveHandlers
}
}

type ProxyFunc func(req *http.Request) (*url.URL, error)

func newHTTPTransport(opts *TransportRequestOptions, proxy ProxyFunc, traceDialer *TraceDialer, subgraph string) *http.Transport {
Expand Down
7 changes: 4 additions & 3 deletions router/core/router_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

type subscriptionHooks struct {
onStart []func(ctx SubscriptionOnStartHandlerContext) error
onPublishEvents []func(ctx StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
onReceiveEvents []func(ctx StreamReceiveEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
onStart []func(ctx SubscriptionOnStartHandlerContext) error
onPublishEvents []func(ctx StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
onReceiveEvents []func(ctx StreamReceiveEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
maxConcurrentOnReceiveHooks int
}

type Config struct {
Expand Down
8 changes: 5 additions & 3 deletions router/core/supervisor_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package core
import (
"context"
"fmt"
"net/http"
"os"
"strings"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/dustin/go-humanize"
"github.com/wundergraph/cosmo/router/pkg/authentication"
Expand All @@ -13,9 +17,6 @@ import (
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/http"
"os"
"strings"
)

// newRouter creates a new router instance.
Expand Down Expand Up @@ -251,6 +252,7 @@ func optionsFromResources(logger *zap.Logger, config *config.Config) []Option {
WithMCP(config.MCP),
WithPlugins(config.Plugins),
WithDemoMode(config.DemoMode),
WithSubscriptionHooks(config.Events.SubscriptionHooks),
}

return options
Expand Down
2 changes: 1 addition & 1 deletion router/demo.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ events:
redis:
- id: my-redis
urls:
- "redis://localhost:6379/2"
- "redis://localhost:6379/2"
7 changes: 6 additions & 1 deletion router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,12 @@ type EventProviders struct {
}

type EventsConfiguration struct {
Providers EventProviders `yaml:"providers,omitempty"`
Providers EventProviders `yaml:"providers,omitempty"`
SubscriptionHooks SubscriptionHooksConfiguration `yaml:"subscription_hooks,omitempty"`
}

type SubscriptionHooksConfiguration struct {
MaxConcurrentEventReceiveHandlers int `yaml:"max_concurrent_event_receive_handlers" envDefault:"100"`
}

type Cluster struct {
Expand Down
13 changes: 13 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,19 @@
}
}
}
},
"subscription_hooks": {
"type": "object",
"description": "Configuration for subscription custom modules that are executed when events are received from a broker.",
"additionalProperties": false,
"properties": {
"max_concurrent_event_receive_handlers": {
"type": "integer",
"description": "The maximum number of concurrent event receive handlers. This controls the concurrency of the OnReceiveEvents custom modules.",
"minimum": 1,
"default": 100
}
}
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions router/pkg/config/fixtures/full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ events:
urls:
- 'redis://localhost:6379/11'
cluster_enabled: true
subscription_hooks:
max_concurrent_event_receive_handlers: 100

engine:
enable_single_flight: true
Expand Down
3 changes: 3 additions & 0 deletions router/pkg/config/testdata/config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@
"Nats": null,
"Kafka": null,
"Redis": null
},
"SubscriptionHooks": {
"MaxConcurrentEventReceiveHandlers": 100
}
},
"CacheWarmup": {
Expand Down
3 changes: 3 additions & 0 deletions router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@
"ClusterEnabled": true
}
]
},
"SubscriptionHooks": {
"MaxConcurrentEventReceiveHandlers": 100
}
},
"CacheWarmup": {
Expand Down
7 changes: 4 additions & 3 deletions router/pkg/pubsub/datasource/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type OnReceiveEventsFn func(ctx context.Context, subConf SubscriptionEventConfig

// Hooks contains hooks for the pubsub providers and data sources
type Hooks struct {
SubscriptionOnStart []SubscriptionOnStartFn
OnReceiveEvents []OnReceiveEventsFn
OnPublishEvents []OnPublishEventsFn
SubscriptionOnStart []SubscriptionOnStartFn
OnReceiveEvents []OnReceiveEventsFn
OnPublishEvents []OnPublishEventsFn
MaxConcurrentOnReceiveHandlers int
}
123 changes: 94 additions & 29 deletions router/pkg/pubsub/datasource/subscription_event_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datasource

import (
"context"
"sync"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"go.uber.org/zap"
Expand Down Expand Up @@ -31,34 +32,30 @@ func (s *subscriptionEventUpdater) Update(events []StreamEvent) {
}
return
}
// If there are hooks, we should apply them separated for each subscription
for ctx, subId := range s.eventUpdater.Subscriptions() {
processedEvents, err := applyStreamEventHooks(
ctx,
s.subscriptionEventConfiguration,
events,
s.hooks.OnReceiveEvents,
)
// updates the events even if the hooks fail
// if a hook doesn't want to send the events, it should return no events!
for _, event := range processedEvents {
s.eventUpdater.UpdateSubscription(subId, event.GetData())
}
if err != nil {
// For all errors, log them
if s.logger != nil {
s.logger.Error(
"An error occurred while processing stream events hooks",
zap.Error(err),
zap.String("provider_type", string(s.subscriptionEventConfiguration.ProviderType())),
zap.String("provider_id", s.subscriptionEventConfiguration.ProviderID()),
zap.String("field_name", s.subscriptionEventConfiguration.RootFieldName()),
)
}
// Always close the subscription when a hook reports an error to avoid inconsistent state.
s.eventUpdater.CloseSubscription(resolve.SubscriptionCloseKindNormal, subId)
}

subscriptions := s.eventUpdater.Subscriptions()
limit := max(s.hooks.MaxConcurrentOnReceiveHandlers, 1)
semaphore := make(chan struct{}, limit)
wg := sync.WaitGroup{}
errCh := make(chan error, len(subscriptions))

for ctx, subId := range subscriptions {
semaphore <- struct{}{} // Acquire a slot
eventsCopy := copyEvents(events)
wg.Add(1)
Comment thread
alepane21 marked this conversation as resolved.
go s.updateSubscription(ctx, &wg, errCh, semaphore, subId, eventsCopy)
}

doneLogging := make(chan struct{})
go func() {
s.deduplicateAndLogErrors(errCh, len(subscriptions))
doneLogging <- struct{}{}
}()

wg.Wait()
close(semaphore)
close(errCh)
<-doneLogging
}

func (s *subscriptionEventUpdater) Complete() {
Expand All @@ -73,9 +70,9 @@ func (s *subscriptionEventUpdater) SetHooks(hooks Hooks) {
s.hooks = hooks
}

// applyStreamEventHooks processes events through a chain of hook functions
// applyReceiveEventHooks processes events through a chain of hook functions
// Each hook receives the result from the previous hook, creating a proper middleware pipeline
func applyStreamEventHooks(
func applyReceiveEventHooks(
ctx context.Context,
cfg SubscriptionEventConfiguration,
events []StreamEvent,
Expand All @@ -97,6 +94,74 @@ func applyStreamEventHooks(
return currentEvents, nil
}

func copyEvents(in []StreamEvent) []StreamEvent {
out := make([]StreamEvent, len(in))
for i := range in {
out[i] = in[i].Clone()
}
return out
}

func (s *subscriptionEventUpdater) updateSubscription(ctx context.Context, wg *sync.WaitGroup, errCh chan error, semaphore chan struct{}, subID resolve.SubscriptionIdentifier, events []StreamEvent) {
defer wg.Done()
defer func() {
<-semaphore // Release the slot when done
}()

hooks := s.hooks.OnReceiveEvents

// modify events with hooks
var err error
for i := range hooks {
events, err = hooks[i](ctx, s.subscriptionEventConfiguration, events)
if err != nil {
errCh <- err
}
}

// send events to the subscription,
// regardless if there was an error during hook processing.
// If no events should be sent, hook must return no events.
for _, event := range events {
s.eventUpdater.UpdateSubscription(subID, event.GetData())
}

// In case there was an error we close the affected subscription.
if err != nil {
s.eventUpdater.CloseSubscription(resolve.SubscriptionCloseKindNormal, subID)
}
}

// deduplicateAndLogErrors collects errors from errCh
// and deduplicates them based on their err.Error() value.
// Afterwards it uses s.logger to log the message.
func (s *subscriptionEventUpdater) deduplicateAndLogErrors(errCh chan error, size int) {
if s.logger == nil {
return
}

errs := make(map[string]int, size)
for err := range errCh {
amount, found := errs[err.Error()]
if found {
errs[err.Error()] = amount + 1
continue
}
errs[err.Error()] = 1
}

for err, amount := range errs {
s.logger.Error(
"some handlers have thrown an error",
zap.String("error", err),
zap.Int("amount_handlers", amount),
zap.String("provider_type", string(s.subscriptionEventConfiguration.ProviderType())),
zap.String("provider_id", s.subscriptionEventConfiguration.ProviderID()),
zap.String("field_name", s.subscriptionEventConfiguration.RootFieldName()),
)
}
}

func NewSubscriptionEventUpdater(
cfg SubscriptionEventConfiguration,
hooks Hooks,
Expand Down
Loading
Loading