Skip to content
Merged
4 changes: 2 additions & 2 deletions demo/pkg/subgraphs/availability/subgraph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions demo/pkg/subgraphs/mood/subgraph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions router-tests/modules/start_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func TestStartSubscriptionHook(t *testing.T) {
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdatedMyKafka" {
return nil
}
ctx.WriteEvent(&kafka.Event{
ctx.WriteEvent((&kafka.MutableEvent{
Key: []byte("1"),
Data: []byte(`{"id": 1, "__typename": "Employee"}`),
})
}))
return nil
},
},
Expand Down Expand Up @@ -266,9 +266,9 @@ func TestStartSubscriptionHook(t *testing.T) {
if employeeId != 1 {
return nil
}
ctx.WriteEvent(&kafka.Event{
ctx.WriteEvent((&kafka.MutableEvent{
Data: []byte(`{"id": 1, "__typename": "Employee"}`),
})
}))
return nil
},
},
Expand Down Expand Up @@ -510,9 +510,7 @@ func TestStartSubscriptionHook(t *testing.T) {
Modules: map[string]interface{}{
"startSubscriptionModule": start_subscription.StartSubscriptionModule{
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
ctx.WriteEvent(&core.EngineEvent{
Data: []byte(`{"data":{"countEmp":1000}}`),
})
ctx.WriteEvent(core.MutableEngineEvent([]byte(`{"data":{"countEmp":1000}}`)))
return nil
},
},
Expand Down
4 changes: 2 additions & 2 deletions router-tests/modules/stream-publish/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const myModuleID = "publishModule"

type PublishModule struct {
Logger *zap.Logger
Callback func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
Callback func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error)
}

func (m *PublishModule) Provision(ctx *core.ModuleContext) error {
Expand All @@ -21,7 +21,7 @@ func (m *PublishModule) Provision(ctx *core.ModuleContext) error {
return nil
}

func (m *PublishModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
func (m *PublishModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
m.Logger.Info("Publish Hook has been run")

if m.Callback != nil {
Expand Down
4 changes: 2 additions & 2 deletions router-tests/modules/stream-receive/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const myModuleID = "streamReceiveModule"

type StreamReceiveModule struct {
Logger *zap.Logger
Callback func(ctx core.StreamReceiveEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error)
Callback func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error)
}

func (m *StreamReceiveModule) Provision(ctx *core.ModuleContext) error {
Expand All @@ -21,7 +21,7 @@ func (m *StreamReceiveModule) Provision(ctx *core.ModuleContext) error {
return nil
}

func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
m.Logger.Info("Stream Hook has been run")

if m.Callback != nil {
Expand Down
100 changes: 65 additions & 35 deletions router-tests/modules/stream_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
"strconv"
"sync/atomic"
"testing"
"time"

Expand All @@ -23,6 +24,53 @@ import (
func TestPublishHook(t *testing.T) {
t.Parallel()

t.Run("Test Publish hook can't assert to mutable types", func(t *testing.T) {
t.Parallel()

var taPossible atomic.Bool
taPossible.Store(true)

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
for _, evt := range events.All() {
_, ok := evt.(datasource.MutableStreamEvent)
if !ok {
taPossible.Store(false)
}
}
return events, nil
},
},
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_publish.PublishModule{}),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":false}}}`, resOne.Body)

requestLog := xEnv.Observer().FilterMessage("Publish Hook has been run")
assert.Len(t, requestLog.All(), 1)

assert.False(t, taPossible.Load(), "invalid type assertion was possible")
})
})

t.Run("Test Publish hook is called", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -55,25 +103,13 @@ func TestPublishHook(t *testing.T) {
})
})

t.Run("Test Publish kafka hook allows to set headers", func(t *testing.T) {
t.Run("Test Publish hook is called with mutable event", func(t *testing.T) {
t.Parallel()

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
for _, event := range events {
evt, ok := event.(*kafka.Event)
if !ok {
continue
}
evt.Headers["x-test"] = []byte("test")
}

return events, nil
},
},
"publishModule": stream_publish.PublishModule{},
},
}

Expand All @@ -89,21 +125,13 @@ func TestPublishHook(t *testing.T) {
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
events.KafkaEnsureTopicExists(t, xEnv, time.Second, "employeeUpdated")
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":true}}}`, resOne.Body)
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":false}}}`, resOne.Body)

requestLog := xEnv.Observer().FilterMessage("Publish Hook has been run")
assert.Len(t, requestLog.All(), 1)

records, err := events.ReadKafkaMessages(xEnv, time.Second, "employeeUpdated", 1)
require.NoError(t, err)
require.Len(t, records, 1)
header := records[0].Headers[0]
require.Equal(t, "x-test", header.Key)
require.Equal(t, []byte("test"), header.Value)
})
})

Expand All @@ -114,7 +142,7 @@ func TestPublishHook(t *testing.T) {
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
return events, core.NewHttpGraphqlError("test", http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
},
},
Expand Down Expand Up @@ -159,7 +187,7 @@ func TestPublishHook(t *testing.T) {
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
return events, core.NewHttpGraphqlError("test", http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
},
},
Expand Down Expand Up @@ -213,7 +241,7 @@ func TestPublishHook(t *testing.T) {
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
return events, core.NewHttpGraphqlError("test", http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
},
},
Expand Down Expand Up @@ -257,26 +285,28 @@ func TestPublishHook(t *testing.T) {
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": stream_publish.PublishModule{
Callback: func(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployeeMyKafka" {
return events, nil
}

employeeID := ctx.Operation().Variables().GetInt("employeeID")

newEvents := []datasource.StreamEvent{}
for _, event := range events {
evt, ok := event.(*kafka.Event)
newEvents := make([]datasource.StreamEvent, 0, events.Len())
for _, event := range events.All() {
newEvt, ok := event.Clone().(*kafka.MutableEvent)
if !ok {
continue
}
if evt.Headers == nil {
evt.Headers = map[string][]byte{}
newEvt.SetData([]byte(`{"__typename":"Employee","id": 3,"update":{"name":"foo"}}`))
if newEvt.Headers == nil {
newEvt.Headers = map[string][]byte{}
}
evt.Headers["x-employee-id"] = []byte(strconv.Itoa(employeeID))
newEvents = append(newEvents, event)
newEvt.Headers["x-employee-id"] = []byte(strconv.Itoa(employeeID))
newEvents = append(newEvents, newEvt)
}
return newEvents, nil

return datasource.NewStreamEvents(newEvents), nil
},
},
},
Expand Down
Loading
Loading