Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
40d8146
feat: event metrics
SkArchon Aug 13, 2025
a29ce3b
fix: refactoring
SkArchon Aug 14, 2025
7bf5f3e
fix: add request
SkArchon Aug 14, 2025
695f529
fix: refactoring
SkArchon Aug 14, 2025
192f095
fix: add tests
SkArchon Aug 14, 2025
c4fddad
fix: kafka tests
SkArchon Aug 14, 2025
50bd6e5
fix: updates
SkArchon Aug 14, 2025
eb4f235
fix: updates
SkArchon Aug 14, 2025
3f3f5d3
fix: tests
SkArchon Aug 14, 2025
7f8e869
fix: tests
SkArchon Aug 14, 2025
3cd1234
fix: review comments
SkArchon Aug 14, 2025
bbd5f06
fix: tests
SkArchon Aug 14, 2025
0c1c9c0
fix: refactoring
SkArchon Aug 14, 2025
b369054
fix: review comments
SkArchon Aug 14, 2025
7d50c7d
fix: review comments
SkArchon Aug 14, 2025
a8141ff
fix: tests
SkArchon Aug 15, 2025
89e96b2
fix: refactoring
SkArchon Aug 17, 2025
6c7e0a9
fix: tests
SkArchon Aug 17, 2025
b4f9a09
fix: tests
SkArchon Aug 17, 2025
77065ee
fix: tests
SkArchon Aug 17, 2025
933cab4
fix: naming operation
SkArchon Aug 18, 2025
dbc50e5
fix: cleanup
SkArchon Aug 18, 2025
061ffd1
Merge branch 'main' into milinda/eng-7785-subscriptions-metrics
SkArchon Aug 18, 2025
066a57b
Merge branch 'main' into milinda/eng-7785-subscriptions-metrics
SkArchon Aug 18, 2025
b56c97f
fix: improve tests
SkArchon Aug 18, 2025
e72b9ae
fix: attempt to fix subgraph tests
SkArchon Aug 18, 2025
775ea72
fix: revert false positive
SkArchon Aug 18, 2025
e3d63d4
fix: tests
SkArchon Aug 18, 2025
2ac4afa
fix: linting
SkArchon Aug 18, 2025
fb2c05c
fix: review comments
SkArchon Aug 18, 2025
176e960
fix: tests
SkArchon Aug 18, 2025
8a41205
fix: tests
SkArchon Aug 18, 2025
fdbe1f8
fix: tests
SkArchon Aug 18, 2025
c56b7c1
fix: tests
SkArchon Aug 18, 2025
e945f7d
fix: tests
SkArchon Aug 18, 2025
87c67fc
fix: review comments
SkArchon Aug 19, 2025
eed45fc
fix: error entry
SkArchon Aug 19, 2025
430c421
refactor: naming to use stream metrics
SkArchon Aug 19, 2025
1667595
fix: renaming
SkArchon Aug 19, 2025
f4393c0
fix: add router prefix name value
SkArchon Aug 19, 2025
415a816
fix: update description
SkArchon Aug 19, 2025
83bc1f8
Merge remote-tracking branch 'origin/main' into milinda/eng-7785-subs…
SkArchon Aug 19, 2025
0da26e7
fix: renaming
SkArchon Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions demo/pkg/subgraphs/subgraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/99designs/gqlgen/graphql/playground"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
rmetric "github.com/wundergraph/cosmo/router/pkg/metric"
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
natsPubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -210,13 +212,17 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {

natsPubSubByProviderID := map[string]natsPubsub.Adapter{}

defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test")
defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
return nil, fmt.Errorf("failed to create default nats adapter: %w", err)
}
natsPubSubByProviderID["default"] = defaultAdapter

myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test")
myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
return nil, fmt.Errorf("failed to create my-nats adapter: %w", err)
}
Expand Down
81 changes: 81 additions & 0 deletions router-tests/events/event_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package events
Comment thread
SkArchon marked this conversation as resolved.

import (
"context"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/wundergraph/cosmo/router-tests/testenv"
"net/url"
"testing"
"time"
)

const waitTimeout = time.Second * 30

func ProduceKafkaMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

pErrCh := make(chan error)

xEnv.KafkaClient.Produce(ctx, &kgo.Record{
Topic: xEnv.GetPubSubName(topicName),
Value: []byte(message),
}, func(_ *kgo.Record, err error) {
pErrCh <- err
})

testenv.AwaitChannelWithT(t, waitTimeout, pErrCh, func(t *testing.T, pErr error) {
require.NoError(t, pErr)
})

fErr := xEnv.KafkaClient.Flush(ctx)
require.NoError(t, fErr)
}

func EnsureTopicExists(t *testing.T, xEnv *testenv.Environment, topics ...string) {
// Delete topic for idempotency
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
prefixedTopics := make([]string, 0, len(topics))
for _, topic := range topics {
prefixedTopics = append(prefixedTopics, xEnv.GetPubSubName(topic))
}

_, err := xEnv.KafkaAdminClient.DeleteTopics(deleteCtx, prefixedTopics...)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err = xEnv.KafkaAdminClient.CreateTopics(ctx, 1, 1, nil, prefixedTopics...)
require.NoError(t, err)
}

func ProduceRedisMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

parsedURL, err := url.Parse(xEnv.RedisHosts[0])
if err != nil {
t.Fatalf("Failed to parse Redis URL: %v", err)
}
var redisConn redis.UniversalClient
if !xEnv.RedisWithClusterMode {
redisConn = redis.NewClient(&redis.Options{
Addr: parsedURL.Host,
})
} else {
redisConn = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{parsedURL.Host},
})
}

defer func() {
_ = redisConn.Close()
}()

intCmd := redisConn.Publish(ctx, xEnv.GetPubSubName(topicName), message)
require.NoError(t, intCmd.Err())
}
2 changes: 1 addition & 1 deletion router-tests/events/events_config_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events_test
package events

import (
"testing"
Expand Down
Loading
Loading