Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 127 additions & 203 deletions router-tests/modules/stream_receive_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package module_test

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -522,207 +522,6 @@ func TestReceiveHook(t *testing.T) {
})
})

t.Run("Test error deduplication with multiple subscriptions", func(t *testing.T) {
t.Parallel()

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"streamReceiveModule": stream_receive.StreamReceiveModule{
Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
return datasource.NewStreamEvents(nil), errors.New("deduplicated error")
},
},
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.ErrorLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
topics := []string{"employeeUpdated"}
events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...)

var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
Details struct {
Forename string `graphql:"forename"`
Surname string `graphql:"surname"`
} `graphql:"details"`
} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()

// Create 3 subscriptions that will all receive the same error
clients := make([]*graphql.SubscriptionClient, 3)
clientRunChs := make([]chan error, 3)

for i := range 3 {
clients[i] = graphql.NewSubscriptionClient(surl)
clientRunChs[i] = make(chan error)

subscriptionID, err := clients[i].Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionID)

go func() {
clientRunChs[i] <- clients[i].Run()
}()
}

// Wait for all subscriptions to be established
xEnv.WaitForSubscriptionCount(3, Timeout)

// Produce a message that will trigger the error in all handlers
events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`)

// Wait for all subscriptions to be closed due to the error
xEnv.WaitForSubscriptionCount(0, Timeout)

// Verify all clients completed
for i := 0; i < 3; i++ {
testenv.AwaitChannelWithT(t, Timeout, clientRunChs[i], func(t *testing.T, err error) {
require.NoError(t, err)
}, "client should have completed when server closed connection")
}

xEnv.WaitForTriggerCount(0, Timeout)

// Verify error deduplication: should see only one error log entry
errorLogs := xEnv.Observer().FilterMessage("some handlers have thrown an error")
assert.Len(t, errorLogs.All(), 1, "should have exactly one deduplicated error log entry")

// Verify the error log contains the correct error message and count
if len(errorLogs.All()) > 0 {
logEntry := errorLogs.All()[0]
fields := logEntry.ContextMap()

assert.Equal(t, "deduplicated error", fields["error"], "error message should match")
assert.Equal(t, int64(3), fields["amount_handlers"], "should count all 3 handlers that threw the error")
}
})
})

t.Run("Test unique error messages are all logged", func(t *testing.T) {
t.Parallel()

var errorCounter atomic.Int32

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"streamReceiveModule": stream_receive.StreamReceiveModule{
Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
count := errorCounter.Add(1)
return datasource.NewStreamEvents(nil), fmt.Errorf("unique error %d", count)
},
},
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.ErrorLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
topics := []string{"employeeUpdated"}
events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...)

var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
Details struct {
Forename string `graphql:"forename"`
Surname string `graphql:"surname"`
} `graphql:"details"`
} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()

// Create 3 subscriptions that will each receive a unique error
clients := make([]*graphql.SubscriptionClient, 3)
clientRunChs := make([]chan error, 3)

for i := range 3 {
clients[i] = graphql.NewSubscriptionClient(surl)
clientRunChs[i] = make(chan error)

subscriptionID, err := clients[i].Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionID)

go func() {
clientRunChs[i] <- clients[i].Run()
}()
}

// Wait for all subscriptions to be established
xEnv.WaitForSubscriptionCount(3, Timeout)

// Produce a message that will trigger a unique error in each handler
events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`)

// Wait for all subscriptions to be closed due to the error
xEnv.WaitForSubscriptionCount(0, Timeout)

// Verify all clients completed
for i := range 3 {
testenv.AwaitChannelWithT(t, Timeout, clientRunChs[i], func(t *testing.T, err error) {
require.NoError(t, err)
}, "client should have completed when server closed connection")
}

xEnv.WaitForTriggerCount(0, Timeout)

// Verify no deduplication: should see three error log entries (one for each unique error)
errorLogs := xEnv.Observer().FilterMessage("some handlers have thrown an error")
assert.Len(t, errorLogs.All(), 3, "should have three separate error log entries for unique errors")

// Verify each error log contains a unique error message and count of 1
if len(errorLogs.All()) == 3 {
var errorMessages []string
for _, logEntry := range errorLogs.All() {
fields := logEntry.ContextMap()
errorMsg, ok := fields["error"].(string)
require.True(t, ok, "error field should be a string")

// Check that error message is unique (starts with "unique error")
assert.Contains(t, errorMsg, "unique error", "error message should contain 'unique error'")
assert.NotContains(t, errorMessages, errorMsg, "each error message should be unique")
errorMessages = append(errorMessages, errorMsg)

// Each unique error should have been thrown by exactly 1 handler
assert.Equal(t, int64(1), fields["amount_handlers"], "each unique error should have amount_handlers = 1")
}

// Verify we got exactly 3 unique error messages
assert.Len(t, errorMessages, 3, "should have exactly 3 unique error messages")
}
})
})

t.Run("Test concurrent handler execution works", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -813,7 +612,9 @@ func TestReceiveHook(t *testing.T) {
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
core.WithSubscriptionHooks(config.SubscriptionHooksConfiguration{
MaxConcurrentEventReceiveHandlers: tc.maxConcurrent,
OnReceiveEvents: config.OnReceiveEventsConfiguration{
MaxConcurrentHandlers: tc.maxConcurrent,
},
}),
},
LogObservation: testenv.LogObservationConfig{
Expand Down Expand Up @@ -893,4 +694,127 @@ func TestReceiveHook(t *testing.T) {
})
}
})

t.Run("Test timeout mechanism allows out-of-order event delivery", func(t *testing.T) {
t.Parallel()

// One subscriber receives three consecutive events.
// The first event's hook is delayed, exceeding the timeout.
// The second and third events' hooks process immediately without delay.
// Because the first hook exceeds the timeout, the system abandons waiting for it
// and processes the second and third events.
// The first event will be delivered later when its hook finally completes.
// This should result in event order [2, 3, 1] at the client.

hookDelay := 500 * time.Millisecond
hookTimeout := 100 * time.Millisecond

var callCount atomic.Int32

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"streamReceiveModule": stream_receive.StreamReceiveModule{
Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
// Only the first call should delay
if callCount.Add(1) == 1 {
time.Sleep(hookDelay)
}
return events, nil
},
},
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
core.WithSubscriptionHooks(config.SubscriptionHooksConfiguration{
OnReceiveEvents: config.OnReceiveEventsConfiguration{
MaxConcurrentHandlers: 3,
HandlerTimeout: hookTimeout,
},
}),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
topics := []string{"employeeUpdated"}
events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...)

var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
Details struct {
Forename string `graphql:"forename"`
Surname string `graphql:"surname"`
} `graphql:"details"`
} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 3)
subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
subscriptionArgsCh <- kafkaSubscriptionArgs{
dataValue: dataValue,
errValue: errValue,
}
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, Timeout)

events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"first"}}`)
events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 2,"update":{"name":"second"}}`)
events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 3,"update":{"name":"third"}}`)

// Collect all 3 events
receivedIDs := make([]float64, 0, 3)
for i := 0; i < 3; i++ {
testenv.AwaitChannelWithT(t, Timeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) {
require.NoError(t, args.errValue)

var response struct {
EmployeeUpdatedMyKafka struct {
ID float64 `json:"id"`
} `json:"employeeUpdatedMyKafka"`
}
err := json.Unmarshal(args.dataValue, &response)
require.NoError(t, err)
receivedIDs = append(receivedIDs, response.EmployeeUpdatedMyKafka.ID)
})
}

require.NoError(t, client.Close())
testenv.AwaitChannelWithT(t, Timeout, clientRunCh, func(t *testing.T, err error) {
require.NoError(t, err)
}, "unable to close client before timeout")

// Verify events arrived out of order: event 1 should be the last one to arrive
assert.ElementsMatch(t, []float64{1, 2, 3}, receivedIDs, "expected to receive all events")
assert.Equal(t, float64(1), receivedIDs[len(receivedIDs)-1], "expected the delayed event to arrive last")
assert.NotEqual(t, float64(1), receivedIDs[0], "expected at least one later event to arrive before the delayed one")

timeoutLog := xEnv.Observer().FilterMessage("Timeout exceeded during subscription updates, events may arrive out of order")
assert.Len(t, timeoutLog.All(), 1, "expected timeout warning to be logged")

// Verify all hooks were executed
hookLog := xEnv.Observer().FilterMessage("Stream Hook has been run")
assert.Len(t, hookLog.All(), 3)
})
})
}
Loading
Loading