Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
198 commits
Select commit Hold shift + click to select a range
1d520ea
chore: add rfc
alepane21 Jul 10, 2025
290aa5f
Add some more description
alepane21 Jul 10, 2025
da3f6a1
feat: improved RFC base on feedback received
alepane21 Jul 11, 2025
82fed9f
chore: clean a bit structures
alepane21 Jul 11, 2025
1252790
chore: small fixes
alepane21 Jul 11, 2025
0008025
chore: fix indentations
alepane21 Jul 11, 2025
26bb7db
chore: small fixes
alepane21 Jul 11, 2025
d909dc0
chore: improved english flow
alepane21 Jul 11, 2025
d032b67
chore: fix wrong logic in an example
alepane21 Jul 11, 2025
9c69cdc
chore: change StreamOnEventFilter meaning of the returned boolean
alepane21 Jul 11, 2025
80f7097
chore: small fixes to example
alepane21 Jul 11, 2025
7969ec9
chore: address feedback
alepane21 Jul 12, 2025
5312f14
chore: limit filter example to inbound events
alepane21 Jul 12, 2025
259b2a0
chore: separate inbound and outbound hooks, added some complete examples
alepane21 Jul 13, 2025
23f3e19
chore: add some comments
alepane21 Jul 13, 2025
4d76c66
chore: moved interfaces and types to an appendix
alepane21 Jul 13, 2025
e35ac47
chore: improve example checks
alepane21 Jul 13, 2025
df62784
chore: improve data structure to expose ProviderId/Type and Specific …
alepane21 Jul 13, 2025
ace1c5c
chore: removed logic in the examples
alepane21 Jul 13, 2025
e6fc78f
chore: add proposal about how we could integrate AsyncAPI
alepane21 Jul 13, 2025
95a3941
core: add usage of FieldName()
alepane21 Jul 13, 2025
5adb360
chore: remove second example
alepane21 Jul 13, 2025
bfe2651
chore: explicit that the second async schema can be used by outside s…
alepane21 Jul 13, 2025
368413e
chore: change sent to published to be more consistent
alepane21 Jul 13, 2025
9015adf
chore: add enums for provider types
alepane21 Jul 13, 2025
cc83382
chore: remove Data and SetData, they break immutability!
alepane21 Jul 13, 2025
b5f188c
chore: FieldName -> RootFieldName
alepane21 Jul 14, 2025
68e7874
chore: improve data mapping description
alepane21 Jul 14, 2025
127a08a
chore: explicit that WriteEvent is writing event only to the current …
alepane21 Jul 14, 2025
083bf70
chore: explicit the WriteEvent behaviour also in the example
alepane21 Jul 14, 2025
ef36c76
chore: add an outlook on the Appendix 2 about a possible evolution
alepane21 Jul 14, 2025
93e4b04
chore: fixes example code to avoid panic on request not authenticated
alepane21 Jul 14, 2025
1fdeec3
chore: add explicit reference to Filtering hook that could be used fo…
alepane21 Jul 14, 2025
d38018e
chore: added description on the event filtering about authorization
alepane21 Jul 14, 2025
1ae023e
Merge branch 'main' into ale/eng-7470-rfc-cosmo-streams-v1-authorizat…
alepane21 Jul 14, 2025
f58970f
chore: add StreamHookError to allow error response customization from…
alepane21 Jul 14, 2025
8ac629c
chore: add more details and an example with kafka
alepane21 Jul 14, 2025
434c61c
chore: add an example on how to use the metadata field
alepane21 Jul 14, 2025
35e8077
fix: ignore marshal error
alepane21 Jul 14, 2025
fa562c9
chore: add PublishEventConfiguration, ProviderType and specialized ev…
alepane21 Jul 15, 2025
7ececf0
chore: remove local file
alepane21 Jul 15, 2025
93db159
Merge branch 'main' into ale/eng-7599-introduce-base-structures-to-pu…
alepane21 Jul 15, 2025
1ec4816
chore: use ProviderID()
alepane21 Jul 15, 2025
b938232
chore: update demo to use no Event structure
alepane21 Jul 15, 2025
f69b4cd
fix: align event configuration format
alepane21 Jul 15, 2025
74f32fc
fix: update MarshalJSONTemplate for new internal event format
alepane21 Jul 15, 2025
1649ef5
chore: remove duplicated test
alepane21 Jul 15, 2025
b83e4ef
chore: improve naming
alepane21 Jul 15, 2025
9ed94fd
chore: make the ADR from the approved RFC
alepane21 Jul 16, 2025
e61ad7a
Merge branch 'main' into ale/eng-7470-rfc-cosmo-streams-v1-authorizat…
alepane21 Jul 16, 2025
62a0882
chore: add an example to adr
alepane21 Jul 16, 2025
7d8a782
Merge branch 'ale/eng-7470-rfc-cosmo-streams-v1-authorization-init-ma…
alepane21 Jul 16, 2025
b4838be
Merge branch 'main' into ale/eng-7599-introduce-base-structures-to-pu…
alepane21 Jul 16, 2025
0e60603
feat: add SubscriptionOnStartHandler
alepane21 Jul 17, 2025
9fc67d2
Merge branch 'main' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 17, 2025
73c3fb2
feat: add WriteEvent to context
alepane21 Jul 17, 2025
8b3d17b
chore: add test, improve method signature
alepane21 Jul 18, 2025
1977d3a
Merge branch 'main' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 18, 2025
17984ce
feat: pass down additional event data to pubsub system
alepane21 Jul 18, 2025
0992ef6
chore: add SubscriptionEventUpdater to mockery
alepane21 Jul 18, 2025
6622459
chore: improved structure, add support for start hook also on subgrap…
alepane21 Jul 18, 2025
19ca6fd
chore: update deps
alepane21 Jul 19, 2025
79f05e5
chore: go mod tidy
alepane21 Jul 19, 2025
4b9c8a7
fix: properly escapes values
alepane21 Jul 19, 2025
9257565
chore: align MarshalJSONTemplate and improve the kafka one
alepane21 Jul 21, 2025
d6c6ec6
chore: small fix to MarshalJSONTemplate format
alepane21 Jul 21, 2025
a8b7f7d
chore: use the new subscription hooks on the engine
alepane21 Jul 22, 2025
18bee17
chore: update engine version
alepane21 Jul 22, 2025
bda2ed6
chore: tidy mods of router-tests
alepane21 Jul 22, 2025
5873131
Merge branch 'main' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 22, 2025
638c6f7
chore: hooks can close subscription and specify custom errors
alepane21 Jul 22, 2025
cf1c868
chore: WriteEvents now calls directly resolveCtx.EmitSubscriptionUpdate
alepane21 Jul 23, 2025
831222e
Merge remote-tracking branch 'origin/main' into ale/eng-7600-add-subs…
alepane21 Jul 23, 2025
89832b9
fix: minor typos on adr
alepane21 Jul 23, 2025
7b8239f
chore: aligned names in the engine with the ones here
alepane21 Jul 23, 2025
9c8609f
chore: use new engine hooks
alepane21 Jul 23, 2025
d6dd642
chore: update engine
alepane21 Jul 23, 2025
6ad6f59
chore: update engine
alepane21 Jul 23, 2025
c330e46
Merge branch 'main' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 23, 2025
c4dd131
fix: make CustomModuleError panic-free
alepane21 Jul 23, 2025
2e737da
fix: make NewPubSubOnSubscriptionStartHook and NewEngineOnSubscriptio…
alepane21 Jul 23, 2025
6ea1ee1
chore: clarify SubscriptionOnStartHookContext interface
alepane21 Jul 23, 2025
057a01f
chore: small coherency fixes
alepane21 Jul 23, 2025
73292a7
chore: small fixes and made names more coherent
alepane21 Jul 24, 2025
45a71e0
chore: add comments
alepane21 Jul 24, 2025
c58e79a
chore: add headers on kafka publish
alepane21 Jul 24, 2025
7c4a508
chore: use new TryEmitSubscriptionUpdate, added Headers on nats
alepane21 Jul 24, 2025
87faf10
chore: headers were not managed when nats was using channels
alepane21 Jul 24, 2025
4d4e675
Merge branch 'main' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 24, 2025
dee05a8
chore: initial working version of Batch and Stream hooks
alepane21 Jul 28, 2025
f7d9b42
chore: fixed behaviour when no modules are specified
alepane21 Jul 28, 2025
742d6ab
chore: fix tests
alepane21 Jul 28, 2025
036538b
chore: removed useless adapters
alepane21 Jul 28, 2025
2c52b39
chore: improved behaviour with multiple modules
alepane21 Jul 28, 2025
f9be6a1
chore: separate private from public data
alepane21 Jul 28, 2025
9f661f3
chore: update graphql-go-tools to v2.0.0-rc.213 and use new Subscript…
alepane21 Jul 29, 2025
cb6d326
Revert "chore: update graphql-go-tools to v2.0.0-rc.213 and use new S…
alepane21 Jul 29, 2025
bff8717
chore: update graphql-go-tools to v2.0.0-rc.213 and use new Subscript…
alepane21 Jul 29, 2025
21c6366
chore: improve adr
alepane21 Jul 29, 2025
057d0f0
Merge branch 'topic/streams-v1' into ale/eng-7600-add-subscriptionons…
alepane21 Jul 29, 2025
5a11bb6
chore: remove hookedprovider and embedded hooks logics inside pubsubp…
alepane21 Jul 29, 2025
e030e1c
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Jul 29, 2025
7f6d791
chore: better names
alepane21 Jul 30, 2025
c46dcf5
Merge remote-tracking branch 'origin/ale/eng-7601-add-streambatcheven…
alepane21 Jul 30, 2025
35aceb0
refactor: rename ProviderLifecycle to Lifecycle and replaced hookedda…
alepane21 Jul 30, 2025
83b5072
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Jul 30, 2025
85a1bdb
chore: add a description to PubSubSubscriptionDataSource
alepane21 Jul 30, 2025
27e2b84
chore: add PubSubSubscriptionDataSource tests
alepane21 Jul 30, 2025
ffbd209
chore: implement PR suggestions
alepane21 Jul 30, 2025
6cbc62d
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Jul 30, 2025
37ede74
chore: implement hooks logic in existing subscriptionEventUpdater ins…
alepane21 Jul 30, 2025
202ed52
chore: revert useless change
alepane21 Jul 30, 2025
ed22545
chore: implement suggestion
alepane21 Jul 31, 2025
d014b3a
chore: fix test
alepane21 Jul 31, 2025
8f6adbe
chore: use hooks instead of specifying single hooks
alepane21 Jul 31, 2025
98c5bd0
chore: better names
alepane21 Jul 31, 2025
7e36d3e
feat: manage error on update
alepane21 Jul 31, 2025
765897d
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Jul 31, 2025
75932b9
chore: use hooks on providers
alepane21 Aug 1, 2025
7bbec8f
chore: add tests
alepane21 Aug 1, 2025
82b5521
chore: update go.mod
alepane21 Aug 4, 2025
6ab1463
chore: go mod tidy
alepane21 Aug 4, 2025
957857b
chore: add subscription_event_updater tests
alepane21 Aug 4, 2025
4d93bd1
chore: change SubscriptionOnStartHandler to return only the error and…
alepane21 Aug 4, 2025
e309be2
chore: add what changes should be made to cosmo streams hooks with mo…
alepane21 Aug 4, 2025
cfc7a57
chore: add failing test
alepane21 Aug 5, 2025
fca32f6
chore: update engine
alepane21 Aug 5, 2025
61e0ddc
chore: update router
alepane21 Aug 5, 2025
cd9e4e4
chore: update engine
alepane21 Aug 6, 2025
1c4a1a9
chore: update router
alepane21 Aug 6, 2025
ca980ff
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Aug 6, 2025
d1f9af9
chore: add tests
alepane21 Aug 6, 2025
36dfd4f
chore: improve error behaviour of updater
alepane21 Aug 7, 2025
ceac255
chore: rename CloseConnection to CloseSubscription
alepane21 Aug 8, 2025
4865504
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Aug 8, 2025
266c096
chore: improved error on publish behaviour
alepane21 Aug 8, 2025
4d39d4c
chore: fix failing test with new logic
alepane21 Aug 8, 2025
79595ea
chore: correctly test redis
alepane21 Aug 8, 2025
2343e03
chore: improved Publish method
alepane21 Aug 8, 2025
3d829f8
chore: send updates even when an hook returns an error
alepane21 Aug 8, 2025
018e53c
fix: rootFieldName missing
alepane21 Aug 8, 2025
3df02fe
chore: fix tests
alepane21 Aug 8, 2025
511a172
chore: remove close option from subscription start hook
alepane21 Aug 19, 2025
e34b6fe
chore: use updated engine
alepane21 Aug 19, 2025
70ef951
chore: use update router
alepane21 Aug 19, 2025
fb6c1a4
chore: fix test for new behaviour when an error is returned
alepane21 Aug 19, 2025
103443b
chore: remove closeSubscription field from StreamHookError and update…
alepane21 Aug 20, 2025
384d027
chore: update to new engine
alepane21 Aug 20, 2025
8177f80
chore: update engine
alepane21 Aug 21, 2025
0614486
chore: update graphql-go-tools dependency to latest version
alepane21 Aug 21, 2025
f3c67c7
chore: update graphql-go-tools dependency to new release version
alepane21 Aug 21, 2025
4a8bcbd
chore: update router and graphql-go-tools dependencies to latest vers…
alepane21 Aug 21, 2025
aa67e6d
chore: make the SubscriptionOnStartHookContext more restricted
alepane21 Aug 22, 2025
5caae33
chore: fix compile issues
StarpTech Aug 23, 2025
0756819
chore: remove test module
alepane21 Aug 25, 2025
3ac47c4
chore: restore Authentication on SubscriptionOnStart context
alepane21 Aug 25, 2025
665269e
chore: update graphql-go-tools dependency to new release version
alepane21 Aug 25, 2025
806a9cf
chore: update graphql-go-tools dependency to latest release version
alepane21 Aug 25, 2025
31d35e9
chore: update graphql-go-tools dependency to new release version
alepane21 Aug 25, 2025
ba23381
chore: update router and graphql-go-tools dependencies to latest vers…
alepane21 Aug 25, 2025
6ee505c
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Aug 25, 2025
4a4fdd0
chore: upgrade engine
alepane21 Sep 3, 2025
14b5007
chore: upgrade deps
alepane21 Sep 3, 2025
d046fcd
chore: tidy modules
alepane21 Sep 3, 2025
bd4263a
chore: updage engine
alepane21 Sep 3, 2025
02e79be
chore: update router and graphql-go-tools dependencies to latest vers…
alepane21 Sep 3, 2025
7f93003
chore: go mod tidy
alepane21 Sep 3, 2025
90dba99
chore: update graphql-go-tools dependency to latest version
alepane21 Sep 9, 2025
e0583e8
chore: update router and graphql-go-tools dependencies to latest vers…
alepane21 Sep 9, 2025
564837a
chore: go mod tidy
alepane21 Sep 15, 2025
2af75cb
chore: update ADR to reflect implementation of SubscriptionOnStart
alepane21 Sep 16, 2025
2766ed7
chore: add test to verify module dependency
dkorittki Sep 16, 2025
3f703d8
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
alepane21 Sep 17, 2025
ca97bb8
chore: update go.mod to reflect go.sum
dkorittki Sep 18, 2025
5627260
refactor: rename StreamBatchEventHook to StreamReceiveEventHook and u…
alepane21 Sep 18, 2025
2932829
Merge remote-tracking branch 'origin/ale/eng-7601-add-streambatcheven…
alepane21 Sep 18, 2025
8d60929
refactor: rename OnStreamEvents to OnReceiveEvents and update related…
alepane21 Sep 18, 2025
f6fa44c
fix: use CanonicalHeaderKey for x-custom-header in streamReceiveModul…
alepane21 Sep 18, 2025
3cdb817
chore: add test for subscription closing via hook
dkorittki Sep 18, 2025
6dbbe94
Revert "chore: add test for subscription closing via hook"
dkorittki Sep 18, 2025
f945f97
chore: add test to verify subscription close via hook
dkorittki Sep 18, 2025
8e87e78
fix: close kafka clients + subscribers on topic poller exit
dkorittki Sep 22, 2025
6ce23a0
chore: now SubscriptionEventUpdater runs hooks separately for each ac…
alepane21 Sep 24, 2025
a235b91
chore: update graphql-go-tools dependency to v2.0.0-rc.213.0.20250924…
alepane21 Sep 24, 2025
b19956a
chore: fix S1019
alepane21 Sep 24, 2025
14e3c36
fix: test should use the new module id streamReceiveModule
alepane21 Sep 24, 2025
9a7e08f
fix: remove unnecessary Kafka connection log verification and streaml…
alepane21 Sep 24, 2025
d61dcde
chore: add a test to verify that StreamReceiveEventHook can access he…
alepane21 Sep 24, 2025
31a68cf
feat: enhance StreamReceiveEventHook documentation
alepane21 Sep 25, 2025
3a0653c
chore: fix events Clone method and remove useless redis Adapter
alepane21 Sep 30, 2025
9b0e12d
chore: rename hooks to handlers
dkorittki Sep 30, 2025
af3bb30
Merge branch 'topic/streams-v1' into ale/eng-7600-add-subscriptionons…
dkorittki Sep 30, 2025
57bb4d9
chore: go mod tidy
dkorittki Oct 1, 2025
516d86b
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
dkorittki Oct 1, 2025
f863f14
chore: remove last references to CloseSubscription, fix PR comments
alepane21 Oct 8, 2025
b975c60
chore: replace StreamHandlerError with already existing httpGraphqlError
alepane21 Oct 9, 2025
ce54927
chore: wrong clone in the EngineEvent
alepane21 Oct 9, 2025
a46d013
Merge branch 'topic/streams-v1' into ale/eng-7601-add-streambatcheven…
dkorittki Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 66 additions & 63 deletions adr/cosmo-streams-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,18 @@ The following interfaces will extend the existing logic in the custom modules.
These provide additional control over subscriptions by providing hooks, which are invoked during specific events.

- `SubscriptionOnStartHandler`: Called once at subscription start.
- `StreamBatchEventHook`: Called each time a batch of events is received from the provider.
- `StreamPublishEventHook`: Called each time a batch of events is going to be sent to the provider.
- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery.
- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider.

```go
// STRUCTURES TO BE ADDED TO PUBSUB PACKAGE
type ProviderType string
const (
ProviderTypeNats ProviderType = "nats"
ProviderTypeNats ProviderType = "nats"
ProviderTypeKafka ProviderType = "kafka"
ProviderTypeRedis ProviderType = "redis"
}

// StreamHookError is used to customize the error messages and the behavior
type StreamHookError struct {
HttpError core.HttpError
CloseSubscription bool
}

// OperationContext already exists, we just have to add the Variables() method
type OperationContext interface {
Name() string
Expand All @@ -48,8 +42,9 @@ type OperationContext interface {

// each provider will have its own event type with custom fields
// the StreamEvent interface is used to allow the hooks system to be provider-agnostic
// there could be common fields in future, but for now we don't need them
type StreamEvent interface {}
type StreamEvent interface {
GetData() []byte
}

// SubscriptionEventConfiguration is the common interface for the subscription event configuration
type SubscriptionEventConfiguration interface {
Expand All @@ -67,7 +62,7 @@ type PublishEventConfiguration interface {
RootFieldName() string
}

type SubscriptionOnStartHookContext interface {
type SubscriptionOnStartHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request
Expand All @@ -85,34 +80,48 @@ type SubscriptionOnStartHookContext interface {

type SubscriptionOnStartHandler interface {
// OnSubscriptionOnStart is called once at subscription start
// Returning an error will result in a GraphQL error being returned to the client, could be customized returning a StreamHookError.
SubscriptionOnStart(ctx SubscriptionOnStartHookContext) error
// Returning an error will result in a GraphQL error being returned to the client
SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error
}

type StreamBatchEventHookContext interface {
// the request context
RequestContext() RequestContext
// the subscription event configuration
type StreamReceiveEventHandlerContext interface {
// Request is the initial client request that started the subscription
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// SubscriptionEventConfiguration is the subscription event configuration
SubscriptionEventConfiguration() SubscriptionEventConfiguration
}

type StreamBatchEventHook interface {
// OnStreamEvents is called each time a batch of events is received from the provider
// Returning an error will result in a GraphQL error being returned to the client, could be customized returning a StreamHookError.
OnStreamEvents(ctx StreamBatchEventHookContext, events []StreamEvent) ([]StreamEvent, error)
type StreamReceiveEventHandler interface {
// OnReceiveEvents is called each time a batch of events is received from the provider before delivering them to the client
// So for a single batch of events received from the provider, this hook will be called one time for each active subscription.
// It is important to optimize the logic inside this hook to avoid performance issues.
// Returning an error will result in a GraphQL error being returned to the client
OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}

type StreamPublishEventHookContext interface {
// the request context
RequestContext() RequestContext
// the publish event configuration
type StreamPublishEventHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// PublishEventConfiguration is the publish event configuration
PublishEventConfiguration() PublishEventConfiguration
}

type StreamPublishEventHook interface {
type StreamPublishEventHandler interface {
// OnPublishEvents is called each time a batch of events is going to be sent to the provider
// Returning an error will result in a GraphQL error being returned to the client, could be customized returning a StreamHookError.
OnPublishEvents(ctx StreamPublishEventHookContext, events []StreamEvent) ([]StreamEvent, error)
// Returning an error will result in an error being returned and the client will see the mutation failing
OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}
```

Expand Down Expand Up @@ -154,7 +163,7 @@ type Employee @key(fields: "id", resolvable: false) {
id: Int! @external
}
```
After publishing the schema, the developer will need to add the module to the cosmo streams engine.
After publishing the schema, the developer will need to add the module to the cosmo router.

### 2. Write the custom module

Expand All @@ -177,39 +186,38 @@ func init() {

type MyModule struct {}

func (m *MyModule) OnStreamEvents(ctx StreamBatchEventHookContext, events []core.StreamEvent) ([]core.StreamEvent, error) {
func (m *MyModule) OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []core.StreamEvent) ([]core.StreamEvent, error) {
// check if the provider is nats
if ctx.StreamContext().ProviderType() != pubsub.ProviderTypeNats {
if ctx.SubscriptionEventConfiguration().ProviderType() != pubsub.ProviderTypeNats {
return events, nil
}

// check if the provider id is the one expected by the module
if ctx.StreamContext().ProviderID() != "my-nats" {
if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" {
return events, nil
}

// check if the subject is the one expected by the module
natsConfig := ctx.SubscriptionEventConfiguration().(*nats.SubscriptionEventConfiguration)
if natsConfig.Subjects[0] != "employeeUpdates" {
return events, nil
}
// check if the subscription is the one expected by the module
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
return events, nil
}

newEvents := make([]core.StreamEvent, 0, len(events))

// check if the client is authenticated
if ctx.RequestContext().Authentication() == nil {
if ctx.Authentication() == nil {
// if the client is not authenticated, return no events
return events, nil
return newEvents, nil
}

// check if the client is allowed to subscribe to the stream
clientAllowedEntitiesIds, found := ctx.RequestContext().Authentication().Claims()["allowedEntitiesIds"]
clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["allowedEntitiesIds"]
if !found {
return events, fmt.Errorf("client is not allowed to subscribe to the stream")
return newEvents, fmt.Errorf("client is not allowed to subscribe to the stream")
}

newEvents := make([]core.StreamEvent, 0, len(events))

for _, evt := range events {
natsEvent, ok := evt.(*nats.NatsEvent);
natsEvent, ok := evt.(*nats.NatsEvent)
if !ok {
newEvents = append(newEvents, evt)
continue
Expand Down Expand Up @@ -266,7 +274,7 @@ func (m *MyModule) Module() core.ModuleInfo {

// Interface guards
var (
_ core.StreamBatchEventHook = (*MyModule)(nil)
_ core.StreamReceiveEventHandler = (*MyModule)(nil)
)
```

Expand Down Expand Up @@ -321,7 +329,7 @@ func init() {

type MyModule struct {}

func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHookContext) error {
func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error {
// check if the provider is nats
if ctx.SubscriptionEventConfiguration().ProviderType() != pubsub.ProviderTypeNats {
return nil
Expand All @@ -332,32 +340,27 @@ func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHookContext) error
return nil
}

// check if the subject is the one expected by the module
natsConfig := ctx.SubscriptionEventConfiguration().(*nats.SubscriptionEventConfiguration)
if natsConfig.Subjects[0] != "employeeUpdates" {
return nil
}
// check if the subscription is the one expected by the module
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
return nil
}

// check if the client is authenticated
if ctx.Authentication() == nil {
// if the client is not authenticated, return an error
return &StreamHookError{
HttpError: core.HttpError{
Code: http.StatusUnauthorized,
Message: "client is not authenticated",
},
return &core.HttpError{
Code: http.StatusUnauthorized,
Message: "client is not authenticated",
CloseSubscription: true,
}
}

// check if the client is allowed to subscribe to the stream
clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["readEmployee"]
if !found {
return &StreamHookError{
HttpError: core.HttpError{
Code: http.StatusForbidden,
Message: "client is not allowed to read employees",
},
return &core.HttpError{
Code: http.StatusForbidden,
Message: "client is not allowed to read employees",
CloseSubscription: true,
}
}
Expand Down Expand Up @@ -405,4 +408,4 @@ We could also generate the AsyncAPI specification from the schema and the events

## Generate hooks from AsyncAPI specifications

Building on the AsyncAPI integration, we could allow the user to define their streams using AsyncAPI and generate fully typesafe hooks with all events structures generated from the AsyncAPI specification.
Building on the AsyncAPI integration, we could allow the user to define their streams using AsyncAPI and generate fully typesafe hooks with all events structures generated from the AsyncAPI specification.
16 changes: 10 additions & 6 deletions demo/pkg/subgraphs/availability/subgraph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions demo/pkg/subgraphs/mood/subgraph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion router-tests/events/events_config_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package events_test

import (
"testing"
Expand Down
Loading
Loading