diff --git a/demo/pkg/subgraphs/subgraphs.go b/demo/pkg/subgraphs/subgraphs.go index e89e2a5def..323fb46292 100644 --- a/demo/pkg/subgraphs/subgraphs.go +++ b/demo/pkg/subgraphs/subgraphs.go @@ -21,6 +21,8 @@ import ( "github.com/99designs/gqlgen/graphql/playground" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" + rmetric "github.com/wundergraph/cosmo/router/pkg/metric" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" natsPubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" "golang.org/x/sync/errgroup" @@ -210,13 +212,17 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) { natsPubSubByProviderID := map[string]natsPubsub.Adapter{} - defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test") + defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + StreamMetricStore: rmetric.NewNoopStreamMetricStore(), + }) if err != nil { return nil, fmt.Errorf("failed to create default nats adapter: %w", err) } natsPubSubByProviderID["default"] = defaultAdapter - myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test") + myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + StreamMetricStore: rmetric.NewNoopStreamMetricStore(), + }) if err != nil { return nil, fmt.Errorf("failed to create my-nats adapter: %w", err) } diff --git a/router-tests/events/event_helpers.go b/router-tests/events/event_helpers.go new file mode 100644 index 0000000000..48d97e90c4 --- /dev/null +++ b/router-tests/events/event_helpers.go @@ -0,0 +1,81 @@ +package events + +import ( + "context" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/wundergraph/cosmo/router-tests/testenv" + "net/url" + "testing" + "time" +) + +const waitTimeout = time.Second * 30 + +func ProduceKafkaMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + pErrCh := make(chan error) + + xEnv.KafkaClient.Produce(ctx, &kgo.Record{ + Topic: xEnv.GetPubSubName(topicName), + Value: []byte(message), + }, func(_ *kgo.Record, err error) { + pErrCh <- err + }) + + testenv.AwaitChannelWithT(t, waitTimeout, pErrCh, func(t *testing.T, pErr error) { + require.NoError(t, pErr) + }) + + fErr := xEnv.KafkaClient.Flush(ctx) + require.NoError(t, fErr) +} + +func EnsureTopicExists(t *testing.T, xEnv *testenv.Environment, topics ...string) { + // Delete topic for idempotency + deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + prefixedTopics := make([]string, 0, len(topics)) + for _, topic := range topics { + prefixedTopics = append(prefixedTopics, xEnv.GetPubSubName(topic)) + } + + _, err := xEnv.KafkaAdminClient.DeleteTopics(deleteCtx, prefixedTopics...) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = xEnv.KafkaAdminClient.CreateTopics(ctx, 1, 1, nil, prefixedTopics...) + require.NoError(t, err) +} + +func ProduceRedisMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + parsedURL, err := url.Parse(xEnv.RedisHosts[0]) + if err != nil { + t.Fatalf("Failed to parse Redis URL: %v", err) + } + var redisConn redis.UniversalClient + if !xEnv.RedisWithClusterMode { + redisConn = redis.NewClient(&redis.Options{ + Addr: parsedURL.Host, + }) + } else { + redisConn = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{parsedURL.Host}, + }) + } + + defer func() { + _ = redisConn.Close() + }() + + intCmd := redisConn.Publish(ctx, xEnv.GetPubSubName(topicName), message) + require.NoError(t, intCmd.Err()) +} diff --git a/router-tests/events/events_config_test.go b/router-tests/events/events_config_test.go index 50d19dbaed..f7e0739e1c 100644 --- a/router-tests/events/events_config_test.go +++ b/router-tests/events/events_config_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "testing" diff --git a/router-tests/events/kafka_events_test.go b/router-tests/events/kafka_events_test.go index 05f4250003..dbc17f870a 100644 --- a/router-tests/events/kafka_events_test.go +++ b/router-tests/events/kafka_events_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "bufio" @@ -74,7 +74,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -107,7 +107,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) @@ -130,7 +130,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -164,23 +164,23 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], ``) // Empty message + ProduceKafkaMessage(t, xEnv, topics[0], ``) // Empty message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.ErrorContains(t, args.errValue, "Invalid message received") }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) // Missing entity = Resolver error + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) // Missing entity = Resolver error testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.ErrorContains(t, args.errValue, "Cannot return null for non-nullable field 'Subscription.employeeUpdatedMyKafka.id'.") }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) @@ -204,7 +204,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -248,7 +248,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(2, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) @@ -277,7 +277,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -321,7 +321,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(2, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) @@ -333,7 +333,7 @@ func TestKafkaEvents(t *testing.T) { require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) }) - produceKafkaMessage(t, xEnv, topics[1], `{"__typename":"Employee","id": 2,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[1], `{"__typename":"Employee","id": 2,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) @@ -366,7 +366,7 @@ func TestKafkaEvents(t *testing.T) { engineExecutionConfiguration.WebSocketClientReadTimeout = time.Millisecond * 100 }, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -399,7 +399,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) @@ -431,7 +431,7 @@ func TestKafkaEvents(t *testing.T) { core.WithMultipartHeartbeatInterval(multipartHeartbeatInterval), }, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`) @@ -447,10 +447,10 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) assertKafkaMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"employeeUpdatedMyKafka\":{\"id\":1,\"details\":{\"forename\":\"Jens\",\"surname\":\"Neuse\"}}}}}") - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) assertKafkaMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"employeeUpdatedMyKafka\":{\"id\":1,\"details\":{\"forename\":\"Jens\",\"surname\":\"Neuse\"}}}}}") }) }) @@ -497,7 +497,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`) @@ -530,7 +530,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, responseCh, func(t *testing.T, response struct { response *http.Response @@ -562,7 +562,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`) @@ -595,7 +595,7 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) testenv.AwaitChannelWithT(t, KafkaWaitTimeout, responseCh, func(t *testing.T, resp struct { response *http.Response @@ -672,7 +672,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) type subscriptionPayload struct { Data struct { @@ -713,7 +713,7 @@ func TestKafkaEvents(t *testing.T) { // Events 1, 2, 11, and 12 should be included for i := uint32(1); i < 13; i++ { - produceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) + ProduceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) if i == 1 || i == 2 || i == 11 || i == 12 { conn.SetReadDeadline(time.Now().Add(KafkaWaitTimeout)) gErr := conn.ReadJSON(&msg) @@ -739,7 +739,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) type subscriptionPayload struct { Data struct { @@ -780,7 +780,7 @@ func TestKafkaEvents(t *testing.T) { // Events 1, 2, 11, and 12 should be included for i := uint32(1); i < 13; i++ { - produceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) + ProduceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) if i == 1 || i == 2 || i == 11 || i == 12 { conn.SetReadDeadline(time.Now().Add(KafkaWaitTimeout)) gErr := conn.ReadJSON(&msg) @@ -806,7 +806,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) type subscriptionPayload struct { Data struct { @@ -835,10 +835,10 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) // The message should be ignored because "1" does not equal 1 - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) // This message should be delivered because it matches the filter - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":12}`) + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":12}`) conn.SetReadDeadline(time.Now().Add(KafkaWaitTimeout)) readErr := conn.ReadJSON(&msg) require.NoError(t, readErr) @@ -861,7 +861,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) var subscriptionOne struct { employeeUpdatedMyKafka struct { @@ -894,23 +894,23 @@ func TestKafkaEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout) - produceKafkaMessage(t, xEnv, topics[0], `{asas`) // Invalid message + ProduceKafkaMessage(t, xEnv, topics[0], `{asas`) // Invalid message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.ErrorContains(t, args.errValue, "Invalid message received") }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) // Correct message + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) // Correct message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) // Missing entity = Resolver error + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) // Missing entity = Resolver error testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.ErrorContains(t, args.errValue, "Cannot return null for non-nullable field 'Subscription.employeeUpdatedMyKafka.id'.") }) - produceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message + ProduceKafkaMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message testenv.AwaitChannelWithT(t, KafkaWaitTimeout, subscriptionOneArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) { require.NoError(t, args.errValue) require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) @@ -932,7 +932,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) // Send a mutation to trigger the first subscription resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ @@ -980,7 +980,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, }, func(t *testing.T, xEnv *testenv.Environment) { - ensureTopicExists(t, xEnv, topics...) + EnsureTopicExists(t, xEnv, topics...) type subscriptionPayload struct { Data struct { @@ -1024,7 +1024,7 @@ func TestKafkaEvents(t *testing.T) { // Events 1, 3, 4, 7, and 11 should be included for i := int(MsgCount); i > 0; i-- { - produceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) + ProduceKafkaMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) if i == 1 || i == 3 || i == 4 || i == 7 || i == 11 { conn.SetReadDeadline(time.Now().Add(KafkaWaitTimeout)) jsonErr := conn.ReadJSON(&msg) @@ -1042,46 +1042,6 @@ func TestKafkaEvents(t *testing.T) { }) } -func ensureTopicExists(t *testing.T, xEnv *testenv.Environment, topics ...string) { - // Delete topic for idempotency - deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - prefixedTopics := make([]string, len(topics)) - for _, topic := range topics { - prefixedTopics = append(prefixedTopics, xEnv.GetPubSubName(topic)) - } - - _, err := xEnv.KafkaAdminClient.DeleteTopics(deleteCtx, prefixedTopics...) - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - _, err = xEnv.KafkaAdminClient.CreateTopics(ctx, 1, 1, nil, prefixedTopics...) - require.NoError(t, err) -} - -func produceKafkaMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - pErrCh := make(chan error) - - xEnv.KafkaClient.Produce(ctx, &kgo.Record{ - Topic: xEnv.GetPubSubName(topicName), - Value: []byte(message), - }, func(record *kgo.Record, err error) { - pErrCh <- err - }) - - testenv.AwaitChannelWithT(t, KafkaWaitTimeout, pErrCh, func(t *testing.T, pErr error) { - require.NoError(t, pErr) - }) - - fErr := xEnv.KafkaClient.Flush(ctx) - require.NoError(t, fErr) -} - func readKafkaMessages(xEnv *testenv.Environment, topicName string, msgs int) ([]*kgo.Record, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index e442ccf9be..d3235643b4 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "bufio" diff --git a/router-tests/events/redis_events_test.go b/router-tests/events/redis_events_test.go index 407e9a9348..1c287f7d6f 100644 --- a/router-tests/events/redis_events_test.go +++ b/router-tests/events/redis_events_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "bufio" @@ -104,7 +104,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // process the message select { @@ -170,7 +170,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce an empty message - produceRedisMessage(t, xEnv, topics[0], ``) + ProduceRedisMessage(t, xEnv, topics[0], ``) // process the message select { case subscriptionArgs := <-subscriptionArgsCh: @@ -181,7 +181,7 @@ func TestRedisEvents(t *testing.T) { t.Fatal("timeout waiting for first message error") } - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // Correct message select { case subscriptionArgs := <-subscriptionArgsCh: require.NoError(t, subscriptionArgs.errValue) @@ -191,7 +191,7 @@ func TestRedisEvents(t *testing.T) { } // Missing entity = Resolver error - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) select { case subscriptionArgs := <-subscriptionArgsCh: var gqlErr graphql.Errors @@ -202,7 +202,7 @@ func TestRedisEvents(t *testing.T) { } // Correct message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) select { case subscriptionArgs := <-subscriptionArgsCh: require.NoError(t, subscriptionArgs.errValue) @@ -273,7 +273,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(2, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message from the first subscription select { @@ -354,7 +354,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(2, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message from the first subscription select { @@ -375,7 +375,7 @@ func TestRedisEvents(t *testing.T) { } // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 2,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 2,"update":{"name":"foo"}}`) // read the message from the first subscription select { @@ -451,7 +451,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message from the subscription select { @@ -509,12 +509,12 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message from the subscription assertRedisMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"employeeUpdates\":{\"id\":1,\"details\":{\"forename\":\"Jens\",\"surname\":\"Neuse\"}}}}}") // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message from the subscription assertRedisMultipartValueEventually(t, reader, "{\"payload\":{\"data\":{\"employeeUpdates\":{\"id\":1,\"details\":{\"forename\":\"Jens\",\"surname\":\"Neuse\"}}}}}") }) @@ -590,7 +590,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message so that the subscription is triggered - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // get the client response var clientRet struct { @@ -663,7 +663,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message so that the subscription is triggered - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // get the client response var clientRet struct { @@ -792,7 +792,7 @@ func TestRedisEvents(t *testing.T) { // Events 1, 3, 4, 7, and 11 should be included for i := MsgCount; i > 0; i-- { - produceRedisMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) + ProduceRedisMessage(t, xEnv, topics[0], fmt.Sprintf(`{"__typename":"Employee","id":%d}`, i)) if i == 11 || i == 7 || i == 4 || i == 3 || i == 1 { gErr := conn.ReadJSON(&msg) @@ -853,7 +853,7 @@ func TestRedisEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce an invalid message - produceRedisMessage(t, xEnv, topics[0], `{asas`) + ProduceRedisMessage(t, xEnv, topics[0], `{asas`) // get the client response select { case args := <-subscriptionOneArgsCh: @@ -865,7 +865,7 @@ func TestRedisEvents(t *testing.T) { } // produce a correct message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id":1}`) // get the client response select { case args := <-subscriptionOneArgsCh: @@ -876,7 +876,7 @@ func TestRedisEvents(t *testing.T) { } // produce a message with a missing entity - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","update":{"name":"foo"}}`) // get the client response select { case args := <-subscriptionOneArgsCh: @@ -888,7 +888,7 @@ func TestRedisEvents(t *testing.T) { } // produce a correct message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // get the client response select { case args := <-subscriptionOneArgsCh: @@ -991,7 +991,7 @@ func TestRedisClusterEvents(t *testing.T) { xEnv.WaitForSubscriptionCount(1, RedisWaitTimeout) // produce a message - produceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + ProduceRedisMessage(t, xEnv, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) // read the message select { @@ -1047,29 +1047,6 @@ func TestRedisClusterEvents(t *testing.T) { } -func produceRedisMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - parsedURL, err := url.Parse(xEnv.RedisHosts[0]) - if err != nil { - t.Fatalf("Failed to parse Redis URL: %v", err) - } - var redisConn redis.UniversalClient - if !xEnv.RedisWithClusterMode { - redisConn = redis.NewClient(&redis.Options{ - Addr: parsedURL.Host, - }) - } else { - redisConn = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: []string{parsedURL.Host}, - }) - } - - intCmd := redisConn.Publish(ctx, xEnv.GetPubSubName(topicName), message) - require.NoError(t, intCmd.Err()) -} - func readRedisMessages(t *testing.T, xEnv *testenv.Environment, channelName string) (<-chan *redis.Message, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/router-tests/prometheus_stream_metrics_test.go b/router-tests/prometheus_stream_metrics_test.go new file mode 100644 index 0000000000..30fa87fe16 --- /dev/null +++ b/router-tests/prometheus_stream_metrics_test.go @@ -0,0 +1,468 @@ +package integration + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/hasura/go-graphql-client" + "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/events" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/pkg/config" + "go.opentelemetry.io/otel/sdk/metric" +) + +type subscriptionArgs struct { + dataValue []byte + errValue error +} + +const WaitTimeout = time.Second * 30 + +func TestFlakyEventMetrics(t *testing.T) { + t.Parallel() + + t.Run("kafka", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + MetricOptions: testenv.MetricOptions{ + EnablePrometheusStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + events.EnsureTopicExists(t, xEnv, "employeeUpdated") + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`}) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_sent_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "produce", operation.GetValue()) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "kafka", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdated")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-kafka", provider.GetValue()) + + require.Equal(t, float64(2), metrics[0].Counter.GetValue()) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + topic := "employeeUpdated" + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + MetricOptions: testenv.MetricOptions{ + EnablePrometheusStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + events.EnsureTopicExists(t, xEnv, topic) + + var subscriptionOne struct { + employeeUpdatedMyKafka struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{dataValue: dataValue, errValue: errValue} + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + clientRunCh := make(chan error) + go func() { clientRunCh <- client.Run() }() + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + + events.ProduceKafkaMessage(t, xEnv, topic, `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_received_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "receive", operation.GetValue()) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "kafka", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdated")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-kafka", provider.GetValue()) + + require.Equal(t, float64(1), metrics[0].Counter.GetValue()) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, clientRunCh, func(t *testing.T, err error) { require.NoError(t, err) }) + }) + }) + }) + + t.Run("nats", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + MetricOptions: testenv.MetricOptions{ + EnablePrometheusStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) { + updateEmployeeMyNats(id: 12, update: $update) {success} + }`, Variables: json.RawMessage(`{"update":{"name":"n1"}}`)}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) { + updateEmployeeMyNats(id: 12, update: $update) {success} + }`, Variables: json.RawMessage(`{"update":{"name":"n2"}}`)}) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_sent_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "publish", operation.GetValue()) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "nats", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdatedMyNats.12")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-nats", provider.GetValue()) + + require.Equal(t, float64(2), metrics[0].Counter.GetValue()) + }) + }) + + t.Run("request", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + MetricOptions: testenv.MetricOptions{ + EnablePrometheusStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + sub, err := xEnv.NatsConnectionMyNats.Subscribe(xEnv.GetPubSubName("getEmployeeMyNats.12"), func(msg *nats.Msg) { _ = msg.Respond([]byte(`{"id": 12, "__typename": "Employee"}`)) }) + require.NoError(t, err) + require.NoError(t, xEnv.NatsConnectionMyNats.Flush()) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `query { employeeFromEventMyNats(employeeID: 12) { id details { forename } }}`}) + require.JSONEq(t, `{"data":{"employeeFromEventMyNats": {"id": 12, "details": {"forename": "David"}}}}`, res.Body) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_sent_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "request", operation.GetValue()) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "nats", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "getEmployeeMyNats.12")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-nats", provider.GetValue()) + + require.Equal(t, float64(1), metrics[0].Counter.GetValue()) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyEngineExecutionConfiguration: func(ec *config.EngineExecutionConfiguration) { ec.WebSocketClientReadTimeout = time.Second }, + MetricOptions: testenv.MetricOptions{EnablePrometheusStreamMetrics: true}, + }, func(t *testing.T, xEnv *testenv.Environment) { + var subscriptionOne struct { + employeeUpdated struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdated(employeeID: 3)"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{ + dataValue: dataValue, + errValue: errValue, + } + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + + clientRunErrCh := make(chan error) + go func() { + clientRunErrCh <- client.Run() + }() + + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + + err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.3"), []byte(`{"id":3,"__typename":"Employee"}`)) + require.NoError(t, err) + + err = xEnv.NatsConnectionDefault.Flush() + require.NoError(t, err) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdated":{"id":3,"details":{"forename":"Stefan","surname":"Avram"}}}`, string(args.dataValue)) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_received_messages_total") + metrics := family.GetMetric() + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "receive", operation.GetValue()) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "nats", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdated.3")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "default", provider.GetValue()) + + require.Equal(t, float64(1), metrics[0].Counter.GetValue()) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, clientRunErrCh, func(t *testing.T, err error) { + require.NoError(t, err) + }, "unable to close client before timeout") + + xEnv.WaitForSubscriptionCount(0, WaitTimeout) + xEnv.WaitForConnectionCount(0, WaitTimeout) + }) + }) + }) + + t.Run("redis", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, + EnableRedis: true, + MetricOptions: testenv.MetricOptions{ + EnablePrometheusStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "r1"}) { success } }`}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "r2"}) { success } }`}) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_sent_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "publish", operation.GetValue()) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "redis", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdatedMyRedis")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-redis", provider.GetValue()) + + require.Equal(t, float64(2), metrics[0].Counter.GetValue()) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, + EnableRedis: true, + MetricOptions: testenv.MetricOptions{EnablePrometheusStreamMetrics: true}, + }, func(t *testing.T, xEnv *testenv.Environment) { + topic := "employeeUpdatedMyRedis" + + var subscriptionOne struct { + employeeUpdates struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdates"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{dataValue, errValue} + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + + runCh := make(chan error) + go func() { runCh <- client.Run() }() + + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + events.ProduceRedisMessage(t, xEnv, topic, `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdates":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) + + mf, err := promRegistry.Gather() + require.NoError(t, err) + + family := findMetricFamilyByName(mf, "router_streams_received_messages_total") + metrics := family.GetMetric() + require.Len(t, metrics, 1) + + errLabel := findMetricLabelByName(metrics, "wg_error_type") + require.Nil(t, errLabel) + + operation := findMetricLabelByName(metrics, "wg_stream_operation_name") + require.Equal(t, "receive", operation.GetValue()) + + system := findMetricLabelByName(metrics, "wg_provider_type") + require.Equal(t, "redis", system.GetValue()) + + destination := findMetricLabelByName(metrics, "wg_destination_name") + require.True(t, strings.HasSuffix(destination.GetValue(), "employeeUpdatedMyRedis")) + + provider := findMetricLabelByName(metrics, "wg_provider_id") + require.NotNil(t, provider) + require.Equal(t, "my-redis", provider.GetValue()) + require.Equal(t, float64(1), metrics[0].Counter.GetValue()) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, runCh, func(t *testing.T, err error) { require.NoError(t, err) }) + }) + }) + }) +} diff --git a/router-tests/telemetry/stream_metrics_test.go b/router-tests/telemetry/stream_metrics_test.go new file mode 100644 index 0000000000..72ac7c654f --- /dev/null +++ b/router-tests/telemetry/stream_metrics_test.go @@ -0,0 +1,500 @@ +package telemetry + +import ( + "context" + "encoding/json" + "strings" + "testing" + "time" + + "github.com/hasura/go-graphql-client" + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" + integration "github.com/wundergraph/cosmo/router-tests" + "github.com/wundergraph/cosmo/router-tests/events" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/pkg/config" + otelattrs "github.com/wundergraph/cosmo/router/pkg/otel" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type subscriptionArgs struct { + dataValue []byte + errValue error +} + +const WaitTimeout = time.Second * 30 + +func TestFlakyEventMetrics(t *testing.T) { + t.Parallel() + + t.Run("kafka", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + MetricOptions: testenv.MetricOptions{ + EnableOTLPStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + events.EnsureTopicExists(t, xEnv, "employeeUpdated") + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`}) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.sent.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + require.Len(t, sum.DataPoints, 1) + + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "produce", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "kafka", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdated")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-kafka", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(2), sum.DataPoints[0].Value) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + topic := "employeeUpdated" + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + MetricOptions: testenv.MetricOptions{ + EnableOTLPStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + events.EnsureTopicExists(t, xEnv, topic) + + var subscriptionOne struct { + employeeUpdatedMyKafka struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{dataValue: dataValue, errValue: errValue} + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + clientRunCh := make(chan error) + go func() { clientRunCh <- client.Run() }() + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + + events.ProduceKafkaMessage(t, xEnv, topic, `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdatedMyKafka":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.received.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "receive", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "kafka", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdated")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-kafka", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(1), sum.DataPoints[0].Value) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, clientRunCh, func(t *testing.T, err error) { require.NoError(t, err) }) + }) + }) + }) + + t.Run("nats", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + MetricOptions: testenv.MetricOptions{ + EnableOTLPStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) { + updateEmployeeMyNats(id: 12, update: $update) {success} + }`, Variables: json.RawMessage(`{"update":{"name":"n1"}}`)}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) { + updateEmployeeMyNats(id: 12, update: $update) {success} + }`, Variables: json.RawMessage(`{"update":{"name":"n2"}}`)}) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.sent.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "publish", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "nats", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdatedMyNats.12")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-nats", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(2), sum.DataPoints[0].Value) + }) + }) + + t.Run("request", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + MetricOptions: testenv.MetricOptions{ + EnableOTLPStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + sub, err := xEnv.NatsConnectionMyNats.Subscribe(xEnv.GetPubSubName("getEmployeeMyNats.12"), func(msg *nats.Msg) { _ = msg.Respond([]byte(`{"id": 12, "__typename": "Employee"}`)) }) + require.NoError(t, err) + require.NoError(t, xEnv.NatsConnectionMyNats.Flush()) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `query { employeeFromEventMyNats(employeeID: 12) { id details { forename } }}`}) + require.JSONEq(t, `{"data":{"employeeFromEventMyNats": {"id": 12, "details": {"forename": "David"}}}}`, res.Body) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.sent.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "request", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "nats", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "getEmployeeMyNats.12")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-nats", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(1), sum.DataPoints[0].Value) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyEngineExecutionConfiguration: func(ec *config.EngineExecutionConfiguration) { ec.WebSocketClientReadTimeout = time.Second }, + MetricOptions: testenv.MetricOptions{EnableOTLPStreamMetrics: true}, + }, func(t *testing.T, xEnv *testenv.Environment) { + var subscriptionOne struct { + employeeUpdated struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdated(employeeID: 3)"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{ + dataValue: dataValue, + errValue: errValue, + } + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + + clientRunErrCh := make(chan error) + go func() { + clientRunErrCh <- client.Run() + }() + + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + + // Send a mutation to trigger the first subscription + err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.3"), []byte(`{"id":3,"__typename":"Employee"}`)) + require.NoError(t, err) + + err = xEnv.NatsConnectionDefault.Flush() + require.NoError(t, err) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdated":{"id":3,"details":{"forename":"Stefan","surname":"Avram"}}}`, string(args.dataValue)) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.received.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "receive", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "nats", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdated.3")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "default", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(1), sum.DataPoints[0].Value) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, clientRunErrCh, func(t *testing.T, err error) { + require.NoError(t, err) + }, "unable to close client before timeout") + + xEnv.WaitForSubscriptionCount(0, WaitTimeout) + xEnv.WaitForConnectionCount(0, WaitTimeout) + }) + }) + }) + + t.Run("redis", func(t *testing.T) { + t.Parallel() + + t.Run("publish", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, + EnableRedis: true, + MetricOptions: testenv.MetricOptions{ + EnableOTLPStreamMetrics: true, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "r1"}) { success } }`}) + xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "r2"}) { success } }`}) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.sent.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "publish", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "redis", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdatedMyRedis")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-redis", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(2), sum.DataPoints[0].Value) + }) + }) + + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, + EnableRedis: true, + MetricOptions: testenv.MetricOptions{EnableOTLPStreamMetrics: true}, + }, func(t *testing.T, xEnv *testenv.Environment) { + topic := "employeeUpdatedMyRedis" + + var subscriptionOne struct { + employeeUpdates struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdates"` + } + + client := graphql.NewSubscriptionClient(xEnv.GraphQLWebSocketSubscriptionURL()) + + subscriptionArgsCh := make(chan subscriptionArgs) + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + subscriptionArgsCh <- subscriptionArgs{dataValue, errValue} + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, subscriptionOneID) + + runCh := make(chan error) + go func() { runCh <- client.Run() }() + + xEnv.WaitForSubscriptionCount(1, WaitTimeout) + events.ProduceRedisMessage(t, xEnv, topic, `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`) + + testenv.AwaitChannelWithT(t, WaitTimeout, subscriptionArgsCh, func(t *testing.T, args subscriptionArgs) { + require.NoError(t, args.errValue) + require.JSONEq(t, `{"employeeUpdates":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}`, string(args.dataValue)) + + rm := metricdata.ResourceMetrics{} + require.NoError(t, metricReader.Collect(context.Background(), &rm)) + + scope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.streams") + require.NotNil(t, scope) + metricEntry := integration.GetMetricByName(scope, "router.streams.received.messages") + require.NotNil(t, metricEntry) + + sum, _ := metricEntry.Data.(metricdata.Sum[int64]) + + require.Len(t, sum.DataPoints, 1) + attrs := sum.DataPoints[0].Attributes + + operation, _ := attrs.Value(otelattrs.WgStreamOperationName) + require.Equal(t, "receive", operation.AsString()) + + system, _ := attrs.Value(otelattrs.WgProviderType) + require.Equal(t, "redis", system.AsString()) + + destination, _ := attrs.Value(otelattrs.WgDestinationName) + require.True(t, strings.HasSuffix(destination.AsString(), "employeeUpdatedMyRedis")) + + provider, hasProvider := attrs.Value(otelattrs.WgProviderId) + require.True(t, hasProvider) + require.Equal(t, "my-redis", provider.AsString()) + + _, hasErr := attrs.Value(otelattrs.WgErrorType) + require.False(t, hasErr) + + require.Equal(t, int64(1), sum.DataPoints[0].Value) + }) + + require.NoError(t, client.Close()) + testenv.AwaitChannelWithT(t, WaitTimeout, runCh, func(t *testing.T, err error) { require.NoError(t, err) }) + }) + }) + }) +} diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index d484f3be49..267483d7bc 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -63,6 +63,7 @@ import ( "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" "github.com/wundergraph/cosmo/router/pkg/logging" rmetric "github.com/wundergraph/cosmo/router/pkg/metric" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" pubsubNats "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" ) @@ -269,8 +270,10 @@ type MetricOptions struct { PrometheusSchemaFieldUsage PrometheusSchemaFieldUsage EnableOTLPConnectionMetrics bool EnableOTLPCircuitBreakerMetrics bool + EnableOTLPStreamMetrics bool EnablePrometheusConnectionMetrics bool EnablePrometheusCircuitBreakerMetrics bool + EnablePrometheusStreamMetrics bool } type PrometheusSchemaFieldUsage struct { @@ -1504,6 +1507,7 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics, ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics, ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels, + Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics, ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo, PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{ Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled, @@ -1526,6 +1530,7 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node Enabled: true, RouterRuntime: testConfig.MetricOptions.EnableRuntimeMetrics, GraphqlCache: testConfig.MetricOptions.EnableOTLPRouterCache, + Streams: testConfig.MetricOptions.EnableOTLPStreamMetrics, ConnectionStats: testConfig.MetricOptions.EnableOTLPConnectionMetrics, EngineStats: config.EngineStats{ Subscriptions: testConfig.MetricOptions.OTLPEngineStatsOptions.EnableSubscription, @@ -2819,7 +2824,9 @@ func subgraphOptions(ctx context.Context, t testing.TB, logger *zap.Logger, nats } natsPubSubByProviderID := make(map[string]pubsubNats.Adapter, len(DemoNatsProviders)) for _, sourceName := range DemoNatsProviders { - adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr") + adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", datasource.ProviderOpts{ + StreamMetricStore: rmetric.NewNoopStreamMetricStore(), + }) require.NoError(t, err) require.NoError(t, adapter.Startup(ctx)) t.Cleanup(func() { diff --git a/router/core/factoryresolver.go b/router/core/factoryresolver.go index 41f0c4d425..57a3bd172f 100644 --- a/router/core/factoryresolver.go +++ b/router/core/factoryresolver.go @@ -8,6 +8,8 @@ import ( "net/url" "slices" + rmetric "github.com/wundergraph/cosmo/router/pkg/metric" + "github.com/buger/jsonparser" "github.com/wundergraph/cosmo/router/pkg/grpcconnector" "github.com/wundergraph/cosmo/router/pkg/pubsub" @@ -211,6 +213,7 @@ type RouterEngineConfiguration struct { Headers *config.HeaderRules Events config.EventsConfiguration SubgraphErrorPropagation config.SubgraphErrorPropagationConfiguration + StreamMetricStore rmetric.StreamMetricStore } func mapProtoFilterToPlanFilter(input *nodev1.SubscriptionFilterCondition, output *plan.SubscriptionFilterCondition) *plan.SubscriptionFilterCondition { @@ -470,6 +473,7 @@ func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nod factoryProviders, factoryDataSources, err := pubsub.BuildProvidersAndDataSources( l.ctx, routerEngineConfig.Events, + routerEngineConfig.StreamMetricStore, l.logger, pubSubDS, l.resolver.InstanceData().HostName, diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 2e4899495d..f79b9f0f92 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -518,6 +518,7 @@ type graphMux struct { metricStore rmetric.Store prometheusCacheMetrics *rmetric.CacheMetrics otelCacheMetrics *rmetric.CacheMetrics + streamMetricStore rmetric.StreamMetricStore } // buildOperationCaches creates the caches for the graph mux. @@ -759,6 +760,12 @@ func (s *graphMux) Shutdown(ctx context.Context) error { } } + if s.streamMetricStore != nil { + if aErr := s.streamMetricStore.Shutdown(ctx); aErr != nil { + err = errors.Join(err, aErr) + } + } + if err != nil { return fmt.Errorf("shutdown graph mux: %w", err) } @@ -774,7 +781,8 @@ func (s *graphServer) buildGraphMux( opts BuildGraphMuxOptions, ) (*graphMux, error) { gm := &graphMux{ - metricStore: rmetric.NewNoopMetrics(), + metricStore: rmetric.NewNoopMetrics(), + streamMetricStore: rmetric.NewNoopStreamMetricStore(), } httpRouter := chi.NewRouter() @@ -872,6 +880,19 @@ func (s *graphServer) buildGraphMux( } } + if s.metricConfig.OpenTelemetry.Streams || s.metricConfig.Prometheus.Streams { + store, err := rmetric.NewStreamMetricStore( + s.logger, + baseMetricAttributes, + s.otlpMeterProvider, + s.promMeterProvider, + s.metricConfig) + if err != nil { + return nil, err + } + gm.streamMetricStore = store + } + subgraphs, err := configureSubgraphOverwrites( opts.EngineConfig, opts.ConfigSubgraphs, @@ -1115,6 +1136,7 @@ func (s *graphServer) buildGraphMux( Headers: s.headerRules, Events: s.eventsConfig, SubgraphErrorPropagation: s.subgraphErrorPropagation, + StreamMetricStore: gm.streamMetricStore, } // map[string]*http.Transport cannot be coerced into map[string]http.RoundTripper, unfortunately diff --git a/router/core/plan_generator.go b/router/core/plan_generator.go index 7a05ec8e7c..3297a4a3bf 100644 --- a/router/core/plan_generator.go +++ b/router/core/plan_generator.go @@ -7,6 +7,8 @@ import ( "net/http" "os" + "github.com/wundergraph/cosmo/router/pkg/metric" + log "github.com/jensneuse/abstractlogger" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astnormalization" @@ -253,7 +255,9 @@ func (pg *PlanGenerator) buildRouterConfig(configFilePath string) (*nodev1.Route } func (pg *PlanGenerator) loadConfiguration(routerConfig *nodev1.RouterConfig, logger *zap.Logger, maxDataSourceCollectorsConcurrency uint) error { - routerEngineConfig := RouterEngineConfiguration{} + routerEngineConfig := RouterEngineConfiguration{ + StreamMetricStore: metric.NewNoopStreamMetricStore(), + } natSources := map[string]*nats.ProviderAdapter{} kafkaSources := map[string]*kafka.ProviderAdapter{} for _, ds := range routerConfig.GetEngineConfig().GetDatasourceConfigurations() { @@ -387,4 +391,4 @@ func findOperationName(operation *ast.Document) (operationName []byte) { } } return nil -} \ No newline at end of file +} diff --git a/router/core/router.go b/router/core/router.go index e71f52e86f..99ce3944c9 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -2240,6 +2240,7 @@ func MetricConfigFromTelemetry(cfg *config.Telemetry) *rmetric.Config { }, Exporters: openTelemetryExporters, CircuitBreaker: cfg.Metrics.OTLP.CircuitBreaker, + Streams: cfg.Metrics.OTLP.Streams, ExcludeMetrics: cfg.Metrics.OTLP.ExcludeMetrics, ExcludeMetricLabels: cfg.Metrics.OTLP.ExcludeMetricLabels, }, @@ -2255,6 +2256,7 @@ func MetricConfigFromTelemetry(cfg *config.Telemetry) *rmetric.Config { CircuitBreaker: cfg.Metrics.Prometheus.CircuitBreaker, ExcludeMetrics: cfg.Metrics.Prometheus.ExcludeMetrics, ExcludeMetricLabels: cfg.Metrics.Prometheus.ExcludeMetricLabels, + Streams: cfg.Metrics.Prometheus.Streams, ExcludeScopeInfo: cfg.Metrics.Prometheus.ExcludeScopeInfo, PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{ Enabled: cfg.Metrics.Prometheus.SchemaFieldUsage.Enabled, diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 6042857a7a..cfd10de0c6 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -100,6 +100,7 @@ type Prometheus struct { ListenAddr string `yaml:"listen_addr" envDefault:"127.0.0.1:8088" env:"PROMETHEUS_LISTEN_ADDR"` GraphqlCache bool `yaml:"graphql_cache" envDefault:"false" env:"PROMETHEUS_GRAPHQL_CACHE"` ConnectionStats bool `yaml:"connection_stats" envDefault:"false" env:"PROMETHEUS_CONNECTION_STATS"` + Streams bool `yaml:"streams" envDefault:"false" env:"PROMETHEUS_STREAM"` EngineStats EngineStats `yaml:"engine_stats" envPrefix:"PROMETHEUS_"` CircuitBreaker bool `yaml:"circuit_breaker" envDefault:"false" env:"PROMETHEUS_CIRCUIT_BREAKER"` ExcludeMetrics RegExArray `yaml:"exclude_metrics,omitempty" env:"PROMETHEUS_EXCLUDE_METRICS"` @@ -137,6 +138,7 @@ type MetricsOTLP struct { ConnectionStats bool `yaml:"connection_stats" envDefault:"false" env:"METRICS_OTLP_CONNECTION_STATS"` EngineStats EngineStats `yaml:"engine_stats" envPrefix:"METRICS_OTLP_"` CircuitBreaker bool `yaml:"circuit_breaker" envDefault:"false" env:"METRICS_OTLP_CIRCUIT_BREAKER"` + Streams bool `yaml:"streams" envDefault:"false" env:"METRICS_OTLP_STREAM"` ExcludeMetrics RegExArray `yaml:"exclude_metrics,omitempty" env:"METRICS_OTLP_EXCLUDE_METRICS"` ExcludeMetricLabels RegExArray `yaml:"exclude_metric_labels,omitempty" env:"METRICS_OTLP_EXCLUDE_METRIC_LABELS"` Exporters []MetricsOTLPExporter `yaml:"exporters"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 783ddfb9f1..7f68126968 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1066,6 +1066,11 @@ "default": false, "description": "Enable the collection of connection stats. The default value is false." }, + "streams": { + "type": "boolean", + "default": false, + "description": "Enable the collection of stream metrics. This contains metrics related to EDFS. The default value is false." + }, "circuit_breaker": { "type": "boolean", "default": false, @@ -1171,6 +1176,11 @@ "default": false, "description": "Enable the collection of connection stats. The default value is false." }, + "streams": { + "type": "boolean", + "default": false, + "description": "Enable the collection of stream metrics. This contains metrics related to EDFS. The default value is false." + }, "circuit_breaker": { "type": "boolean", "default": false, diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 7ea1f5c5c4..57ca8d5856 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -40,6 +40,7 @@ "Subscriptions": false }, "CircuitBreaker": false, + "Streams": false, "ExcludeMetrics": null, "ExcludeMetricLabels": null, "Exporters": null @@ -50,6 +51,7 @@ "ListenAddr": "127.0.0.1:8088", "GraphqlCache": false, "ConnectionStats": false, + "Streams": false, "EngineStats": { "Subscriptions": false }, diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 4dc2ff52be..d5190f302e 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -61,6 +61,7 @@ "Subscriptions": true }, "CircuitBreaker": false, + "Streams": false, "ExcludeMetrics": null, "ExcludeMetricLabels": null, "Exporters": [ @@ -80,6 +81,7 @@ "ListenAddr": "127.0.0.1:8088", "GraphqlCache": true, "ConnectionStats": true, + "Streams": false, "EngineStats": { "Subscriptions": true }, diff --git a/router/pkg/metric/config.go b/router/pkg/metric/config.go index 02b8fb87bd..7a2c15e620 100644 --- a/router/pkg/metric/config.go +++ b/router/pkg/metric/config.go @@ -35,6 +35,7 @@ type PrometheusConfig struct { ExcludeScopeInfo bool // Prometheus schema field usage configuration PromSchemaFieldUsage PrometheusSchemaFieldUsage + Streams bool } type PrometheusSchemaFieldUsage struct { @@ -79,6 +80,7 @@ type OpenTelemetry struct { ExcludeMetricLabels []*regexp.Regexp // TestReader is used for testing purposes. If set, the reader will be used instead of the configured exporters. TestReader sdkmetric.Reader + Streams bool } func GetDefaultExporter(cfg *Config) *OpenTelemetryExporter { diff --git a/router/pkg/metric/noop_stream_metrics.go b/router/pkg/metric/noop_stream_metrics.go new file mode 100644 index 0000000000..c312cc2472 --- /dev/null +++ b/router/pkg/metric/noop_stream_metrics.go @@ -0,0 +1,15 @@ +package metric + +import ( + "context" +) + +type NoopStreamMetricStore struct{} + +func (n *NoopStreamMetricStore) Produce(ctx context.Context, event StreamsEvent) {} +func (n *NoopStreamMetricStore) Consume(ctx context.Context, event StreamsEvent) {} + +func (n *NoopStreamMetricStore) Flush(ctx context.Context) error { return nil } +func (n *NoopStreamMetricStore) Shutdown(ctx context.Context) error { return nil } + +func NewNoopStreamMetricStore() *NoopStreamMetricStore { return &NoopStreamMetricStore{} } diff --git a/router/pkg/metric/oltp_connection_metric_store.go b/router/pkg/metric/oltp_connection_metric_store.go index 0a4b59e27b..e92f43b72b 100644 --- a/router/pkg/metric/oltp_connection_metric_store.go +++ b/router/pkg/metric/oltp_connection_metric_store.go @@ -43,7 +43,11 @@ func newOtlpConnectionMetrics(logger *zap.Logger, meterProvider *metric.MeterPro meter: meter, } - metrics.startInitMetrics(stats, baseAttributes) + err = metrics.startInitMetrics(stats, baseAttributes) + if err != nil { + return nil, err + } + return metrics, nil } diff --git a/router/pkg/metric/oltp_stream_metric_store.go b/router/pkg/metric/oltp_stream_metric_store.go new file mode 100644 index 0000000000..3d7a8573e9 --- /dev/null +++ b/router/pkg/metric/oltp_stream_metric_store.go @@ -0,0 +1,52 @@ +package metric + +import ( + "context" + + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.uber.org/zap" +) + +const ( + cosmoRouterStreamEventMeterName = "cosmo.router.streams" + cosmoRouterStreamEventMeterVersion = "0.0.1" +) + +type otlpStreamEventMetrics struct { + instruments *eventInstruments + meterProvider *metric.MeterProvider + logger *zap.Logger + meter otelmetric.Meter +} + +func newOtlpStreamEventMetrics(logger *zap.Logger, meterProvider *metric.MeterProvider) (*otlpStreamEventMetrics, error) { + meter := meterProvider.Meter( + cosmoRouterStreamEventMeterName, + otelmetric.WithInstrumentationVersion(cosmoRouterStreamEventMeterVersion), + ) + + instruments, err := newStreamEventInstruments(meter) + if err != nil { + return nil, err + } + + return &otlpStreamEventMetrics{ + instruments: instruments, + meterProvider: meterProvider, + logger: logger, + meter: meter, + }, nil +} + +func (o *otlpStreamEventMetrics) Produce(ctx context.Context, opts ...otelmetric.AddOption) { + o.instruments.producedMessages.Add(ctx, 1, opts...) +} + +func (o *otlpStreamEventMetrics) Consume(ctx context.Context, opts ...otelmetric.AddOption) { + o.instruments.consumedMessages.Add(ctx, 1, opts...) +} + +func (o *otlpStreamEventMetrics) Flush(ctx context.Context) error { + return o.meterProvider.ForceFlush(ctx) +} diff --git a/router/pkg/metric/prom_connection_metric_store.go b/router/pkg/metric/prom_connection_metric_store.go index 32ecfc342e..a4a58b14fa 100644 --- a/router/pkg/metric/prom_connection_metric_store.go +++ b/router/pkg/metric/prom_connection_metric_store.go @@ -43,7 +43,11 @@ func newPromConnectionMetrics(logger *zap.Logger, meterProvider *metric.MeterPro logger: logger, } - metrics.startInitMetrics(stats, attributes) + err = metrics.startInitMetrics(stats, attributes) + if err != nil { + return nil, err + } + return metrics, nil } diff --git a/router/pkg/metric/prom_stream_metric_store.go b/router/pkg/metric/prom_stream_metric_store.go new file mode 100644 index 0000000000..30309f2444 --- /dev/null +++ b/router/pkg/metric/prom_stream_metric_store.go @@ -0,0 +1,52 @@ +package metric + +import ( + "context" + + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.uber.org/zap" +) + +const ( + cosmoRouterEventPromMeterName = "cosmo.router.streams.prometheus" + cosmoRouterEventPromMeterVersion = "0.0.1" +) + +type promStreamEventMetrics struct { + instruments *eventInstruments + meterProvider *metric.MeterProvider + logger *zap.Logger + meter otelmetric.Meter +} + +func newPromStreamEventMetrics(logger *zap.Logger, meterProvider *metric.MeterProvider) (*promStreamEventMetrics, error) { + meter := meterProvider.Meter( + cosmoRouterEventPromMeterName, + otelmetric.WithInstrumentationVersion(cosmoRouterEventPromMeterVersion), + ) + + instruments, err := newStreamEventInstruments(meter) + if err != nil { + return nil, err + } + + return &promStreamEventMetrics{ + instruments: instruments, + meterProvider: meterProvider, + logger: logger, + meter: meter, + }, nil +} + +func (p *promStreamEventMetrics) Produce(ctx context.Context, opts ...otelmetric.AddOption) { + p.instruments.producedMessages.Add(ctx, 1, opts...) +} + +func (p *promStreamEventMetrics) Consume(ctx context.Context, opts ...otelmetric.AddOption) { + p.instruments.consumedMessages.Add(ctx, 1, opts...) +} + +func (p *promStreamEventMetrics) Flush(ctx context.Context) error { + return p.meterProvider.ForceFlush(ctx) +} diff --git a/router/pkg/metric/stream_measurements.go b/router/pkg/metric/stream_measurements.go new file mode 100644 index 0000000000..a5e1dadfb1 --- /dev/null +++ b/router/pkg/metric/stream_measurements.go @@ -0,0 +1,49 @@ +package metric + +import ( + "fmt" + + otelmetric "go.opentelemetry.io/otel/metric" +) + +const ( + messagingSentMessages = "router.streams.sent.messages" + messagingConsumedMessages = "router.streams.received.messages" +) + +var ( + messagingSentMessagesOptions = []otelmetric.Int64CounterOption{ + otelmetric.WithDescription("Number of stream sent messages"), + } + messagingConsumedMessagesOptions = []otelmetric.Int64CounterOption{ + otelmetric.WithDescription("Number of stream consumed messages"), + } +) + +type eventInstruments struct { + producedMessages otelmetric.Int64Counter + consumedMessages otelmetric.Int64Counter +} + +func newStreamEventInstruments(meter otelmetric.Meter) (*eventInstruments, error) { + producedCounter, err := meter.Int64Counter( + messagingSentMessages, + messagingSentMessagesOptions..., + ) + if err != nil { + return nil, fmt.Errorf("failed to create sent messages counter: %w", err) + } + + consumedCounter, err := meter.Int64Counter( + messagingConsumedMessages, + messagingConsumedMessagesOptions..., + ) + if err != nil { + return nil, fmt.Errorf("failed to create received messages counter: %w", err) + } + + return &eventInstruments{ + producedMessages: producedCounter, + consumedMessages: consumedCounter, + }, nil +} diff --git a/router/pkg/metric/stream_metric_store.go b/router/pkg/metric/stream_metric_store.go new file mode 100644 index 0000000000..2034d2bc70 --- /dev/null +++ b/router/pkg/metric/stream_metric_store.go @@ -0,0 +1,153 @@ +package metric + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.uber.org/zap" + + otel "github.com/wundergraph/cosmo/router/pkg/otel" +) + +type ProviderType string + +const ( + ProviderTypeKafka ProviderType = "kafka" + ProviderTypeNats ProviderType = "nats" + ProviderTypeRedis ProviderType = "redis" +) + +// StreamsEvent carries the values for stream metrics attributes. +type StreamsEvent struct { + ProviderId string // The id of the provider defined in the configuration + StreamOperationName string // The stream operation name that is specific to the messaging system + ProviderType ProviderType // The messaging system type that are supported + ErrorType string // Optional error type, e.g., "publish_error" or "receive_error". If empty, the attribute is not set + DestinationName string // The name of the destination queue / topic / channel +} + +// StreamMetricProvider is the interface that wraps the basic Event metric methods. +type StreamMetricProvider interface { + Produce(ctx context.Context, opts ...otelmetric.AddOption) + Consume(ctx context.Context, opts ...otelmetric.AddOption) + + Flush(ctx context.Context) error +} + +type StreamMetricStore interface { + Produce(ctx context.Context, event StreamsEvent) + Consume(ctx context.Context, event StreamsEvent) + + Flush(ctx context.Context) error + Shutdown(ctx context.Context) error +} + +// StreamMetrics is the store for Event (Kafka/Redis/NATS) metrics. +type StreamMetrics struct { + baseAttributes []attribute.KeyValue + logger *zap.Logger + providers []StreamMetricProvider +} + +func NewStreamMetricStore(logger *zap.Logger, baseAttributes []attribute.KeyValue, otelProvider, promProvider *metric.MeterProvider, metricsConfig *Config) (*StreamMetrics, error) { + providers := make([]StreamMetricProvider, 0) + + if metricsConfig.OpenTelemetry.Streams { + otlpMetrics, err := newOtlpStreamEventMetrics(logger, otelProvider) + if err != nil { + return nil, fmt.Errorf("failed to create otlp stream event metrics: %w", err) + } + providers = append(providers, otlpMetrics) + } + + if metricsConfig.Prometheus.Streams { + promMetrics, err := newPromStreamEventMetrics(logger, promProvider) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus stream event metrics: %w", err) + } + providers = append(providers, promMetrics) + } + + store := &StreamMetrics{ + baseAttributes: baseAttributes, + logger: logger, + providers: providers, + } + return store, nil +} + +func (e *StreamMetrics) withAttrs(attrs ...attribute.KeyValue) otelmetric.AddOption { + copied := append([]attribute.KeyValue{}, e.baseAttributes...) + return otelmetric.WithAttributes(append(copied, attrs...)...) +} + +func (e *StreamMetrics) Produce(ctx context.Context, event StreamsEvent) { + attrs := []attribute.KeyValue{ + otel.WgStreamOperationName.String(event.StreamOperationName), + otel.WgProviderType.String(string(event.ProviderType)), + } + if event.ErrorType != "" { + attrs = append(attrs, otel.WgErrorType.String(event.ErrorType)) + } + if event.ProviderId != "" { + attrs = append(attrs, otel.WgProviderId.String(event.ProviderId)) + } + if event.DestinationName != "" { + attrs = append(attrs, otel.WgDestinationName.String(event.DestinationName)) + } + opt := e.withAttrs(attrs...) + + for _, provider := range e.providers { + provider.Produce(ctx, opt) + } +} + +func (e *StreamMetrics) Consume(ctx context.Context, event StreamsEvent) { + attrs := []attribute.KeyValue{ + otel.WgStreamOperationName.String(event.StreamOperationName), + otel.WgProviderType.String(string(event.ProviderType)), + } + if event.ErrorType != "" { + attrs = append(attrs, otel.WgErrorType.String(event.ErrorType)) + } + if event.ProviderId != "" { + attrs = append(attrs, otel.WgProviderId.String(event.ProviderId)) + } + if event.DestinationName != "" { + attrs = append(attrs, otel.WgDestinationName.String(event.DestinationName)) + } + + opt := e.withAttrs(attrs...) + + for _, provider := range e.providers { + provider.Consume(ctx, opt) + } +} + +// Flush flushes the metrics to the backend synchronously. +func (e *StreamMetrics) Flush(ctx context.Context) error { + var err error + + for _, provider := range e.providers { + if errOtlp := provider.Flush(ctx); errOtlp != nil { + err = errors.Join(err, fmt.Errorf("failed to flush metrics: %w", errOtlp)) + } + } + + return err +} + +// Shutdown flushes the metrics and stops observers if any. +func (e *StreamMetrics) Shutdown(ctx context.Context) error { + var err error + + if errFlush := e.Flush(ctx); errFlush != nil { + err = errors.Join(err, fmt.Errorf("failed to flush metrics: %w", errFlush)) + } + + return err +} diff --git a/router/pkg/otel/attributes.go b/router/pkg/otel/attributes.go index d1ee5db243..08b3b96d95 100644 --- a/router/pkg/otel/attributes.go +++ b/router/pkg/otel/attributes.go @@ -1,8 +1,9 @@ package otel import ( - "go.opentelemetry.io/otel/attribute" "net" + + "go.opentelemetry.io/otel/attribute" ) const ( @@ -59,6 +60,15 @@ const ( WgGraphQLParentType = attribute.Key("wg.graphql.parent_type") ) +// Messaging metrics attributes +const ( + WgStreamOperationName = attribute.Key("wg.stream.operation.name") + WgProviderType = attribute.Key("wg.provider.type") + WgDestinationName = attribute.Key("wg.destination.name") + WgProviderId = attribute.Key("wg.provider.id") + WgErrorType = attribute.Key("wg.error.type") +) + const ( CacheMetricsOperationTypeAdded = "added" CacheMetricsOperationTypeUpdated = "updated" diff --git a/router/pkg/pubsub/datasource/mocks.go b/router/pkg/pubsub/datasource/mocks.go index 067da9c86c..a6bbb19e18 100644 --- a/router/pkg/pubsub/datasource/mocks.go +++ b/router/pkg/pubsub/datasource/mocks.go @@ -792,7 +792,7 @@ func (_c *MockProviderBuilder_BuildEngineDataSourceFactory_Call[P, E]) RunAndRet } // BuildProvider provides a mock function for the type MockProviderBuilder -func (_mock *MockProviderBuilder[P, E]) BuildProvider(options P) (Provider, error) { +func (_mock *MockProviderBuilder[P, E]) BuildProvider(options P, providerOpts ProviderOpts) (Provider, error) { ret := _mock.Called(options) if len(ret) == 0 { diff --git a/router/pkg/pubsub/datasource/provider.go b/router/pkg/pubsub/datasource/provider.go index f90446a712..d9138630ca 100644 --- a/router/pkg/pubsub/datasource/provider.go +++ b/router/pkg/pubsub/datasource/provider.go @@ -2,6 +2,8 @@ package datasource import ( "context" + + "github.com/wundergraph/cosmo/router/pkg/metric" ) type ArgumentTemplateCallback func(tpl string) (string, error) @@ -27,7 +29,11 @@ type ProviderBuilder[P, E any] interface { // TypeID Get the provider type id (e.g. "kafka", "nats") TypeID() string // BuildProvider Build the provider and the adapter - BuildProvider(options P) (Provider, error) + BuildProvider(options P, providerOpts ProviderOpts) (Provider, error) // BuildEngineDataSourceFactory Build the data source for the given provider and event configuration BuildEngineDataSourceFactory(data E) (EngineDataSourceFactory, error) } + +type ProviderOpts struct { + StreamMetricStore metric.StreamMetricStore +} diff --git a/router/pkg/pubsub/kafka/adapter.go b/router/pkg/pubsub/kafka/adapter.go index 503b8f6f37..e11993b668 100644 --- a/router/pkg/pubsub/kafka/adapter.go +++ b/router/pkg/pubsub/kafka/adapter.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/wundergraph/cosmo/router/pkg/metric" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" @@ -19,6 +21,11 @@ var ( errClientClosed = errors.New("client closed") ) +const ( + kafkaReceive = "receive" + kafkaProduce = "produce" +) + // Adapter defines the interface for Kafka adapter operations type Adapter interface { Subscribe(ctx context.Context, event SubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error @@ -33,16 +40,21 @@ type Adapter interface { // It uses a single write client to produce messages and a client per topic to consume messages. // Each client polls the Kafka topic for new records and updates the subscriptions with the new data. type ProviderAdapter struct { - ctx context.Context - opts []kgo.Opt - logger *zap.Logger - writeClient *kgo.Client - closeWg sync.WaitGroup - cancel context.CancelFunc + ctx context.Context + opts []kgo.Opt + logger *zap.Logger + writeClient *kgo.Client + closeWg sync.WaitGroup + cancel context.CancelFunc + streamMetricStore metric.StreamMetricStore +} + +type PollerOpts struct { + providerId string } // topicPoller polls the Kafka topic for new records and calls the updateTriggers function. -func (p *ProviderAdapter) topicPoller(ctx context.Context, client *kgo.Client, updater resolve.SubscriptionUpdater) error { +func (p *ProviderAdapter) topicPoller(ctx context.Context, client *kgo.Client, updater resolve.SubscriptionUpdater, pollerOpts PollerOpts) error { for { select { case <-p.ctx.Done(): // Close the poller if the application context was canceled @@ -88,6 +100,12 @@ func (p *ProviderAdapter) topicPoller(ctx context.Context, client *kgo.Client, u r := iter.Next() p.logger.Debug("subscription update", zap.String("topic", r.Topic), zap.ByteString("data", r.Value)) + p.streamMetricStore.Consume(p.ctx, metric.StreamsEvent{ + ProviderId: pollerOpts.providerId, + StreamOperationName: kafkaReceive, + ProviderType: metric.ProviderTypeKafka, + DestinationName: r.Topic, + }) updater.Update(r.Value) } } @@ -128,7 +146,7 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEvent defer p.closeWg.Done() - err := p.topicPoller(ctx, client, updater) + err := p.topicPoller(ctx, client, updater, PollerOpts{providerId: event.ProviderID}) if err != nil { if errors.Is(err, errClientClosed) || errors.Is(err, context.Canceled) { log.Debug("poller canceled", zap.Error(err)) @@ -178,9 +196,23 @@ func (p *ProviderAdapter) Publish(ctx context.Context, event PublishEventConfigu if pErr != nil { log.Error("publish error", zap.Error(pErr)) + // failure emission: include error.type generic + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: kafkaProduce, + ProviderType: metric.ProviderTypeKafka, + ErrorType: "publish_error", + DestinationName: event.Topic, + }) return datasource.NewError(fmt.Sprintf("error publishing to Kafka topic %s", event.Topic), pErr) } + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: kafkaProduce, + ProviderType: metric.ProviderTypeKafka, + DestinationName: event.Topic, + }) return nil } @@ -222,17 +254,25 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { return nil } -func NewProviderAdapter(ctx context.Context, logger *zap.Logger, opts []kgo.Opt) (*ProviderAdapter, error) { +func NewProviderAdapter(ctx context.Context, logger *zap.Logger, opts []kgo.Opt, providerOpts datasource.ProviderOpts) (*ProviderAdapter, error) { ctx, cancel := context.WithCancel(ctx) if logger == nil { logger = zap.NewNop() } + var store metric.StreamMetricStore + if providerOpts.StreamMetricStore != nil { + store = providerOpts.StreamMetricStore + } else { + store = metric.NewNoopStreamMetricStore() + } + return &ProviderAdapter{ - ctx: ctx, - logger: logger.With(zap.String("pubsub", "kafka")), - opts: opts, - closeWg: sync.WaitGroup{}, - cancel: cancel, + ctx: ctx, + logger: logger.With(zap.String("pubsub", "kafka")), + opts: opts, + closeWg: sync.WaitGroup{}, + cancel: cancel, + streamMetricStore: store, }, nil } diff --git a/router/pkg/pubsub/kafka/provider_builder.go b/router/pkg/pubsub/kafka/provider_builder.go index 3007b1fafe..c88cf814c2 100644 --- a/router/pkg/pubsub/kafka/provider_builder.go +++ b/router/pkg/pubsub/kafka/provider_builder.go @@ -56,8 +56,8 @@ func (p *ProviderBuilder) BuildEngineDataSourceFactory(data *nodev1.KafkaEventCo }, nil } -func (p *ProviderBuilder) BuildProvider(provider config.KafkaEventSource) (datasource.Provider, error) { - adapter, pubSubProvider, err := buildProvider(p.ctx, provider, p.logger) +func (p *ProviderBuilder) BuildProvider(provider config.KafkaEventSource, providerOpts datasource.ProviderOpts) (datasource.Provider, error) { + adapter, pubSubProvider, err := buildProvider(p.ctx, provider, p.logger, providerOpts) if err != nil { return nil, err } @@ -150,12 +150,12 @@ func buildKafkaOptions(eventSource config.KafkaEventSource, logger *zap.Logger) return opts, nil } -func buildProvider(ctx context.Context, provider config.KafkaEventSource, logger *zap.Logger) (Adapter, datasource.Provider, error) { - options, err := buildKafkaOptions(provider, logger) +func buildProvider(ctx context.Context, provider config.KafkaEventSource, logger *zap.Logger, providerOpts datasource.ProviderOpts) (Adapter, datasource.Provider, error) { + kafkaOpts, err := buildKafkaOptions(provider, logger) if err != nil { return nil, nil, fmt.Errorf("failed to build options for Kafka provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewProviderAdapter(ctx, logger, options) + adapter, err := NewProviderAdapter(ctx, logger, kafkaOpts, providerOpts) if err != nil { return nil, nil, fmt.Errorf("failed to create adapter for Kafka provider with ID \"%s\": %w", provider.ID, err) } diff --git a/router/pkg/pubsub/kafka/provider_builder_test.go b/router/pkg/pubsub/kafka/provider_builder_test.go index 5d72d28d91..6afa7612e4 100644 --- a/router/pkg/pubsub/kafka/provider_builder_test.go +++ b/router/pkg/pubsub/kafka/provider_builder_test.go @@ -93,7 +93,7 @@ func TestPubSubProviderBuilderFactory(t *testing.T) { builder := NewProviderBuilder(ctx, logger, "host", "addr") require.NotNil(t, builder) - provider, err := builder.BuildProvider(cfg) + provider, err := builder.BuildProvider(cfg, datasource.ProviderOpts{}) require.NoError(t, err) // Check the returned provider diff --git a/router/pkg/pubsub/nats/adapter.go b/router/pkg/pubsub/nats/adapter.go index a0bef13f45..d10f8cf93d 100644 --- a/router/pkg/pubsub/nats/adapter.go +++ b/router/pkg/pubsub/nats/adapter.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/wundergraph/cosmo/router/pkg/metric" + "github.com/cespare/xxhash/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" @@ -16,6 +18,12 @@ import ( "go.uber.org/zap" ) +const ( + natsRequest = "request" + natsPublish = "publish" + natsReceive = "receive" +) + // Adapter defines the methods that a NATS adapter should implement type Adapter interface { // Subscribe subscribes to the given events and sends updates to the updater @@ -32,16 +40,17 @@ type Adapter interface { // ProviderAdapter implements the AdapterInterface for NATS pub/sub type ProviderAdapter struct { - ctx context.Context - client *nats.Conn - js jetstream.JetStream - logger *zap.Logger - closeWg sync.WaitGroup - hostName string - routerListenAddr string - url string - opts []nats.Option - flushTimeout time.Duration + ctx context.Context + client *nats.Conn + js jetstream.JetStream + logger *zap.Logger + closeWg sync.WaitGroup + hostName string + routerListenAddr string + url string + opts []nats.Option + flushTimeout time.Duration + streamMetricStore metric.StreamMetricStore } // getInstanceIdentifier returns an identifier for the current instance. @@ -132,6 +141,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEvent for msg := range msgBatch.Messages() { log.Debug("subscription update", zap.String("message_subject", msg.Subject()), zap.ByteString("data", msg.Data())) + p.streamMetricStore.Consume(p.ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsReceive, + ProviderType: metric.ProviderTypeNats, + DestinationName: msg.Subject(), + }) updater.Update(msg.Data()) // Acknowledge the message after it has been processed @@ -169,6 +184,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEvent select { case msg := <-msgChan: log.Debug("subscription update", zap.String("message_subject", msg.Subject), zap.ByteString("data", msg.Data)) + p.streamMetricStore.Consume(p.ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsReceive, + ProviderType: metric.ProviderTypeNats, + DestinationName: msg.Subject, + }) updater.Update(msg.Data) case <-p.ctx.Done(): // When the application context is done, we stop the subscriptions @@ -197,7 +218,7 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEvent return nil } -func (p *ProviderAdapter) Publish(_ context.Context, event PublishAndRequestEventConfiguration) error { +func (p *ProviderAdapter) Publish(ctx context.Context, event PublishAndRequestEventConfiguration) error { log := p.logger.With( zap.String("provider_id", event.ProviderID), zap.String("method", "publish"), @@ -213,7 +234,21 @@ func (p *ProviderAdapter) Publish(_ context.Context, event PublishAndRequestEven err := p.client.Publish(event.Subject, event.Data) if err != nil { log.Error("publish error", zap.Error(err)) + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsPublish, + ProviderType: metric.ProviderTypeNats, + ErrorType: "publish_error", + DestinationName: event.Subject, + }) return datasource.NewError(fmt.Sprintf("error publishing to NATS subject %s", event.Subject), err) + } else { + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsPublish, + ProviderType: metric.ProviderTypeNats, + DestinationName: event.Subject, + }) } return nil @@ -235,9 +270,24 @@ func (p *ProviderAdapter) Request(ctx context.Context, event PublishAndRequestEv msg, err := p.client.RequestWithContext(ctx, event.Subject, event.Data) if err != nil { log.Error("request error", zap.Error(err)) + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsRequest, + ProviderType: metric.ProviderTypeNats, + ErrorType: "request_error", + DestinationName: event.Subject, + }) return datasource.NewError(fmt.Sprintf("error requesting from NATS subject %s", event.Subject), err) } + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: natsRequest, + ProviderType: metric.ProviderTypeNats, + DestinationName: event.Subject, + }) + + // We don't collect metrics on err here as it's an error related to the writer _, err = w.Write(msg.Data) if err != nil { log.Error("error writing response to writer", zap.Error(err)) @@ -303,19 +353,27 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { return nil } -func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string) (Adapter, error) { +func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, error) { if logger == nil { logger = zap.NewNop() } + var store metric.StreamMetricStore + if providerOpts.StreamMetricStore != nil { + store = providerOpts.StreamMetricStore + } else { + store = metric.NewNoopStreamMetricStore() + } + return &ProviderAdapter{ - ctx: ctx, - logger: logger.With(zap.String("pubsub", "nats")), - closeWg: sync.WaitGroup{}, - hostName: hostName, - routerListenAddr: routerListenAddr, - url: url, - opts: opts, - flushTimeout: 10 * time.Second, + ctx: ctx, + logger: logger.With(zap.String("pubsub", "nats")), + closeWg: sync.WaitGroup{}, + hostName: hostName, + routerListenAddr: routerListenAddr, + url: url, + opts: opts, + flushTimeout: 10 * time.Second, + streamMetricStore: store, }, nil } diff --git a/router/pkg/pubsub/nats/provider_builder.go b/router/pkg/pubsub/nats/provider_builder.go index e1ae0a1e70..e3ba5f7cb0 100644 --- a/router/pkg/pubsub/nats/provider_builder.go +++ b/router/pkg/pubsub/nats/provider_builder.go @@ -64,8 +64,8 @@ func (p *ProviderBuilder) BuildEngineDataSourceFactory(data *nodev1.NatsEventCon return dataSourceFactory, nil } -func (p *ProviderBuilder) BuildProvider(provider config.NatsEventSource) (datasource.Provider, error) { - adapter, pubSubProvider, err := buildProvider(p.ctx, provider, p.logger, p.hostName, p.routerListenAddr) +func (p *ProviderBuilder) BuildProvider(provider config.NatsEventSource, providerOpts datasource.ProviderOpts) (datasource.Provider, error) { + adapter, pubSubProvider, err := buildProvider(p.ctx, provider, p.logger, p.hostName, p.routerListenAddr, providerOpts) if err != nil { return nil, err } @@ -118,12 +118,12 @@ func buildNatsOptions(eventSource config.NatsEventSource, logger *zap.Logger) ([ return opts, nil } -func buildProvider(ctx context.Context, provider config.NatsEventSource, logger *zap.Logger, hostName string, routerListenAddr string) (Adapter, datasource.Provider, error) { +func buildProvider(ctx context.Context, provider config.NatsEventSource, logger *zap.Logger, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, datasource.Provider, error) { options, err := buildNatsOptions(provider, logger) if err != nil { return nil, nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr) + adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, providerOpts) if err != nil { return nil, nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err) } diff --git a/router/pkg/pubsub/nats/provider_builder_test.go b/router/pkg/pubsub/nats/provider_builder_test.go index d2646b1941..407e6bde3c 100644 --- a/router/pkg/pubsub/nats/provider_builder_test.go +++ b/router/pkg/pubsub/nats/provider_builder_test.go @@ -82,7 +82,7 @@ func TestPubSubProviderBuilderFactory(t *testing.T) { builder := NewProviderBuilder(ctx, logger, "host", "addr") require.NotNil(t, builder) - provider, err := builder.BuildProvider(cfg) + provider, err := builder.BuildProvider(cfg, datasource.ProviderOpts{}) require.NoError(t, err) // Check the returned provider diff --git a/router/pkg/pubsub/pubsub.go b/router/pkg/pubsub/pubsub.go index c6ec29be82..b92aaad6f7 100644 --- a/router/pkg/pubsub/pubsub.go +++ b/router/pkg/pubsub/pubsub.go @@ -6,6 +6,8 @@ import ( "slices" "strconv" + "github.com/wundergraph/cosmo/router/pkg/metric" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" pubsub_datasource "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" @@ -51,14 +53,11 @@ func (e *ProviderNotDefinedError) Error() string { // BuildProvidersAndDataSources is a generic function that builds providers and data sources for the given // EventsConfiguration and DataSourceConfigurationWithMetadata -func BuildProvidersAndDataSources( - ctx context.Context, - config config.EventsConfiguration, - logger *zap.Logger, - dsConfs []DataSourceConfigurationWithMetadata, - hostName string, - routerListenAddr string, -) ([]pubsub_datasource.Provider, []plan.DataSource, error) { +func BuildProvidersAndDataSources(ctx context.Context, config config.EventsConfiguration, store metric.StreamMetricStore, logger *zap.Logger, dsConfs []DataSourceConfigurationWithMetadata, hostName string, routerListenAddr string) ([]pubsub_datasource.Provider, []plan.DataSource, error) { + if store == nil { + store = metric.NewNoopStreamMetricStore() + } + var pubSubProviders []pubsub_datasource.Provider var outs []plan.DataSource @@ -71,7 +70,7 @@ func BuildProvidersAndDataSources( events: dsConf.Configuration.GetCustomEvents().GetKafka(), }) } - kafkaPubSubProviders, kafkaOuts, err := build(ctx, kafkaBuilder, config.Providers.Kafka, kafkaDsConfsWithEvents) + kafkaPubSubProviders, kafkaOuts, err := build(ctx, kafkaBuilder, config.Providers.Kafka, kafkaDsConfsWithEvents, store) if err != nil { return nil, nil, err } @@ -87,7 +86,7 @@ func BuildProvidersAndDataSources( events: dsConf.Configuration.GetCustomEvents().GetNats(), }) } - natsPubSubProviders, natsOuts, err := build(ctx, natsBuilder, config.Providers.Nats, natsDsConfsWithEvents) + natsPubSubProviders, natsOuts, err := build(ctx, natsBuilder, config.Providers.Nats, natsDsConfsWithEvents, store) if err != nil { return nil, nil, err } @@ -103,7 +102,7 @@ func BuildProvidersAndDataSources( events: dsConf.Configuration.GetCustomEvents().GetRedis(), }) } - redisPubSubProviders, redisOuts, err := build(ctx, redisBuilder, config.Providers.Redis, redisDsConfsWithEvents) + redisPubSubProviders, redisOuts, err := build(ctx, redisBuilder, config.Providers.Redis, redisDsConfsWithEvents, store) if err != nil { return nil, nil, err } @@ -113,7 +112,13 @@ func BuildProvidersAndDataSources( return pubSubProviders, outs, nil } -func build[P GetID, E GetEngineEventConfiguration](ctx context.Context, builder pubsub_datasource.ProviderBuilder[P, E], providersData []P, dsConfs []dsConfAndEvents[E]) ([]pubsub_datasource.Provider, []plan.DataSource, error) { +func build[P GetID, E GetEngineEventConfiguration]( + ctx context.Context, + builder pubsub_datasource.ProviderBuilder[P, E], + providersData []P, + dsConfs []dsConfAndEvents[E], + store metric.StreamMetricStore, +) ([]pubsub_datasource.Provider, []plan.DataSource, error) { var pubSubProviders []pubsub_datasource.Provider var outs []plan.DataSource @@ -133,7 +138,9 @@ func build[P GetID, E GetEngineEventConfiguration](ctx context.Context, builder if !slices.Contains(usedProviderIds, providerData.GetID()) { continue } - provider, err := builder.BuildProvider(providerData) + provider, err := builder.BuildProvider(providerData, pubsub_datasource.ProviderOpts{ + StreamMetricStore: store, + }) if err != nil { return nil, nil, err } diff --git a/router/pkg/pubsub/pubsub_test.go b/router/pkg/pubsub/pubsub_test.go index a76194f7c5..2173e46c3d 100644 --- a/router/pkg/pubsub/pubsub_test.go +++ b/router/pkg/pubsub/pubsub_test.go @@ -3,6 +3,8 @@ package pubsub import ( "context" "errors" + "github.com/stretchr/testify/mock" + rmetric "github.com/wundergraph/cosmo/router/pkg/metric" "testing" "github.com/stretchr/testify/assert" @@ -65,7 +67,7 @@ func TestBuild_OK(t *testing.T) { // ctx, kafkaBuilder, config.Providers.Kafka, kafkaDsConfsWithEvents // Execute the function - providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs) + providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs, rmetric.NewNoopStreamMetricStore()) // Assertions assert.NoError(t, err) @@ -118,10 +120,10 @@ func TestBuild_ProviderError(t *testing.T) { {ID: "provider-1"}, } - mockBuilder.On("BuildProvider", natsEventSources[0]).Return(nil, errors.New("provider error")) + mockBuilder.On("BuildProvider", natsEventSources[0], mock.Anything).Return(nil, errors.New("provider error")) // Execute the function - providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs) + providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs, rmetric.NewNoopStreamMetricStore()) // Assertions assert.Error(t, err) @@ -176,7 +178,7 @@ func TestBuild_ShouldGetAnErrorIfProviderIsNotDefined(t *testing.T) { mockBuilder.On("TypeID").Return("nats") // Execute the function - providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs) + providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs, rmetric.NewNoopStreamMetricStore()) // Assertions assert.Error(t, err) @@ -236,10 +238,11 @@ func TestBuild_ShouldNotInitializeProviderIfNotUsed(t *testing.T) { mockPubSubUsedProvider.On("ID").Return("provider-2") mockBuilder.On("TypeID").Return("nats") - mockBuilder.On("BuildProvider", natsEventSources[1]).Return(mockPubSubUsedProvider, nil) + mockBuilder.On("BuildProvider", natsEventSources[1], mock.Anything). + Return(mockPubSubUsedProvider, nil) // Execute the function - providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs) + providers, dataSources, err := build(ctx, mockBuilder, natsEventSources, dsConfs, rmetric.NewNoopStreamMetricStore()) // Assertions assert.NoError(t, err) @@ -290,7 +293,7 @@ func TestBuildProvidersAndDataSources_Nats_OK(t *testing.T) { {ID: "provider-1"}, }, }, - }, zap.NewNop(), dsConfs, "host", "addr") + }, nil, zap.NewNop(), dsConfs, "host", "addr") // Assertions assert.NoError(t, err) @@ -343,7 +346,7 @@ func TestBuildProvidersAndDataSources_Kafka_OK(t *testing.T) { {ID: "provider-1"}, }, }, - }, zap.NewNop(), dsConfs, "host", "addr") + }, nil, zap.NewNop(), dsConfs, "host", "addr") // Assertions assert.NoError(t, err) @@ -396,7 +399,7 @@ func TestBuildProvidersAndDataSources_Redis_OK(t *testing.T) { {ID: "provider-1"}, }, }, - }, zap.NewNop(), dsConfs, "host", "addr") + }, nil, zap.NewNop(), dsConfs, "host", "addr") // Assertions assert.NoError(t, err) diff --git a/router/pkg/pubsub/redis/adapter.go b/router/pkg/pubsub/redis/adapter.go index 3efcabbf92..13f0cbb0e2 100644 --- a/router/pkg/pubsub/redis/adapter.go +++ b/router/pkg/pubsub/redis/adapter.go @@ -5,12 +5,19 @@ import ( "fmt" "sync" + "github.com/wundergraph/cosmo/router/pkg/metric" + rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis" "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "go.uber.org/zap" ) +const ( + redisPublish = "publish" + redisReceive = "receive" +) + // Adapter defines the methods that a Redis adapter should implement type Adapter interface { // Subscribe subscribes to the given events and sends updates to the updater @@ -23,25 +30,38 @@ type Adapter interface { Shutdown(ctx context.Context) error } -func NewProviderAdapter(ctx context.Context, logger *zap.Logger, urls []string, clusterEnabled bool) Adapter { +func NewProviderAdapter(ctx context.Context, logger *zap.Logger, urls []string, clusterEnabled bool, opts datasource.ProviderOpts) Adapter { ctx, cancel := context.WithCancel(ctx) + if logger == nil { + logger = zap.NewNop() + } + + var store metric.StreamMetricStore + if opts.StreamMetricStore != nil { + store = opts.StreamMetricStore + } else { + store = metric.NewNoopStreamMetricStore() + } + return &ProviderAdapter{ - ctx: ctx, - cancel: cancel, - logger: logger, - urls: urls, - clusterEnabled: clusterEnabled, + ctx: ctx, + cancel: cancel, + logger: logger, + urls: urls, + clusterEnabled: clusterEnabled, + streamMetricStore: store, } } type ProviderAdapter struct { - ctx context.Context - cancel context.CancelFunc - conn rd.RDCloser - logger *zap.Logger - closeWg sync.WaitGroup - urls []string - clusterEnabled bool + ctx context.Context + cancel context.CancelFunc + conn rd.RDCloser + logger *zap.Logger + closeWg sync.WaitGroup + urls []string + clusterEnabled bool + streamMetricStore metric.StreamMetricStore } func (p *ProviderAdapter) Startup(ctx context.Context) error { @@ -107,6 +127,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEvent return } log.Debug("subscription update", zap.String("message_channel", msg.Channel), zap.String("data", msg.Payload)) + p.streamMetricStore.Consume(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: redisReceive, + ProviderType: metric.ProviderTypeRedis, + DestinationName: msg.Channel, + }) updater.Update([]byte(msg.Payload)) case <-p.ctx.Done(): // When the application context is done, we stop the subscription if it is not already done @@ -145,8 +171,21 @@ func (p *ProviderAdapter) Publish(ctx context.Context, event PublishEventConfigu intCmd := p.conn.Publish(ctx, event.Channel, data) if intCmd.Err() != nil { log.Error("publish error", zap.Error(intCmd.Err())) + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: redisPublish, + ProviderType: metric.ProviderTypeRedis, + ErrorType: "publish_error", + DestinationName: event.Channel, + }) return datasource.NewError(fmt.Sprintf("error publishing to Redis PubSub channel %s", event.Channel), intCmd.Err()) } + p.streamMetricStore.Produce(ctx, metric.StreamsEvent{ + ProviderId: event.ProviderID, + StreamOperationName: redisPublish, + ProviderType: metric.ProviderTypeRedis, + DestinationName: event.Channel, + }) return nil } diff --git a/router/pkg/pubsub/redis/provider_builder.go b/router/pkg/pubsub/redis/provider_builder.go index 415963b885..46340934bd 100644 --- a/router/pkg/pubsub/redis/provider_builder.go +++ b/router/pkg/pubsub/redis/provider_builder.go @@ -66,8 +66,8 @@ func (b *ProviderBuilder) BuildEngineDataSourceFactory(data *nodev1.RedisEventCo } // Providers returns the Redis PubSub providers for the given provider IDs -func (b *ProviderBuilder) BuildProvider(provider config.RedisEventSource) (datasource.Provider, error) { - adapter := NewProviderAdapter(b.ctx, b.logger, provider.URLs, provider.ClusterEnabled) +func (b *ProviderBuilder) BuildProvider(provider config.RedisEventSource, providerOpts datasource.ProviderOpts) (datasource.Provider, error) { + adapter := NewProviderAdapter(b.ctx, b.logger, provider.URLs, provider.ClusterEnabled, providerOpts) pubSubProvider := datasource.NewPubSubProvider(provider.ID, providerTypeID, adapter, b.logger) b.adapters[provider.ID] = adapter diff --git a/router/pkg/pubsub/redis/provider_builder_test.go b/router/pkg/pubsub/redis/provider_builder_test.go index 2cbaad10b9..58963dbbc6 100644 --- a/router/pkg/pubsub/redis/provider_builder_test.go +++ b/router/pkg/pubsub/redis/provider_builder_test.go @@ -21,7 +21,7 @@ func TestBuildRedisOptions(t *testing.T) { logger := zaptest.NewLogger(t) ctx := context.Background() builder := NewProviderBuilder(ctx, logger, "host", "addr") - provider, err := builder.BuildProvider(cfg) + provider, err := builder.BuildProvider(cfg, datasource.ProviderOpts{}) require.NoError(t, err) require.NotNil(t, provider) @@ -39,7 +39,7 @@ func TestBuildRedisOptions(t *testing.T) { logger := zaptest.NewLogger(t) ctx := context.Background() builder := NewProviderBuilder(ctx, logger, "host", "addr") - provider, err := builder.BuildProvider(cfg) + provider, err := builder.BuildProvider(cfg, datasource.ProviderOpts{}) require.NoError(t, err) require.NotNil(t, provider) @@ -63,7 +63,7 @@ func TestPubSubProviderBuilderFactory(t *testing.T) { builder := NewProviderBuilder(ctx, logger, "host", "addr") require.NotNil(t, builder) - provider, err := builder.BuildProvider(cfg) + provider, err := builder.BuildProvider(cfg, datasource.ProviderOpts{}) require.NoError(t, err) // Check the returned provider