Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d86e5cc
feat: retry settings config per subgraph
SkArchon Sep 1, 2025
39a200c
fix: review comments
SkArchon Sep 1, 2025
a16a8f9
fix: add test
SkArchon Sep 1, 2025
a8a9cc2
fix: tests
SkArchon Sep 1, 2025
79cdb75
fix: add default options to subgraphs only when required
SkArchon Sep 1, 2025
3344631
fix: review comments
SkArchon Sep 1, 2025
e3cebe5
fix: cleanup formatting
SkArchon Sep 2, 2025
1c9baf2
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 2, 2025
c6c224b
fix: return error
SkArchon Sep 3, 2025
dae502d
fix: review comments
SkArchon Sep 3, 2025
f4f09e1
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 3, 2025
3c7647c
fix: tests
SkArchon Sep 3, 2025
c7b4040
fix: review comments
SkArchon Sep 3, 2025
0ddb674
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 3, 2025
24fbca2
Merge remote-tracking branch 'origin/main' into milinda/retry-per-sub…
SkArchon Sep 16, 2025
a976f16
fix: merge resolving
SkArchon Sep 16, 2025
f4090fa
fix: upgrade request
SkArchon Sep 16, 2025
bceee7e
fix: feature flags
SkArchon Sep 17, 2025
98dde91
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 17, 2025
3131bb7
fix: linting
SkArchon Sep 17, 2025
f353169
fix: linting
SkArchon Sep 17, 2025
72c8268
fix: ci tests
SkArchon Sep 17, 2025
db1964c
fix: increase duration
SkArchon Sep 17, 2025
9826e24
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Oct 16, 2025
8cc3911
Merge remote-tracking branch 'origin/main' into milinda/retry-per-sub…
SkArchon Apr 7, 2026
f37ac89
fix: gofmt
SkArchon Apr 7, 2026
9ec4d8c
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Apr 7, 2026
a4d460b
fix: circuit breaker tests
SkArchon Apr 7, 2026
be2b7c9
fix: tests
SkArchon Apr 7, 2026
dfb90dd
fix: tests
SkArchon Apr 7, 2026
4f52652
fix: changes
SkArchon Apr 21, 2026
65b1db8
fix: retry updates
SkArchon Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions router-tests/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration

import (
"context"
"github.com/gorilla/websocket"
"net/http"
"sort"
"sync/atomic"
Expand Down Expand Up @@ -623,6 +624,139 @@ func TestCircuitBreaker(t *testing.T) {
})
})

t.Run("verify circuit breaker tripping on upgrade requests", func(t *testing.T) {
t.Parallel()

const failedTries int64 = 3

const timestampMessage = `{"type":"next","id":"1","payload":{"data":{"currentTime":{"unixTime":1,"timeStamp":"2021-09-01T12:00:00Z"}}}}`
const completeMessage = `{"type":"complete","id":"1"}`
const defaultErrorMessage = `{"id":"1","type":"error","payload":[{"message":"Internal server error"}]}`

breaker := getCircuitBreakerWithDefaults()
breaker.RequestThreshold = 3
breaker.ErrorThresholdPercentage = 100

breaker.NumBuckets = 1
breaker.RollingDuration = 5000 * time.Millisecond

trafficConfig := getTrafficConfigWithTimeout(breaker, 1*time.Second)

employeesCalls := atomic.Int64{}

testenv.Run(t, &testenv.Config{
ModifyEngineExecutionConfiguration: func(engineExecutionConfiguration *config.EngineExecutionConfiguration) {
engineExecutionConfiguration.WebSocketClientReadTimeout = time.Millisecond * 2000
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.DebugLevel,
},
RouterOptions: []core.Option{
core.WithSubgraphCircuitBreakerOptions(core.NewSubgraphCircuitBreakerOptions(trafficConfig)),
core.WithSubgraphTransportOptions(core.NewSubgraphTransportOptions(trafficConfig)),
},
Subgraphs: testenv.SubgraphsConfig{
Employees: testenv.SubgraphConfig{
Middleware: func(_ http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
employeesCalls.Add(1)
if employeesCalls.Load() <= failedTries {
simulateConnectionFailureOnClose(w)
return
}

upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
Subprotocols: []string{"graphql-transport-ws"},
}
conn, err := upgrader.Upgrade(w, r, nil)
require.NoError(t, err)
defer func() {
_ = conn.Close()
}()

_, _, err = testenv.WSReadMessage(t, conn)
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(`{"type":"connection_ack"}`))
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(timestampMessage))
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(completeMessage))
require.NoError(t, err)
})
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
for i := range failedTries + 2 {
conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
err := testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { currentTime { unixTime timeStamp }}"}`),
})
require.NoError(t, err)

_, message, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)

require.JSONEq(t, defaultErrorMessage, string(message))
require.NoError(t, conn.Close())

switch {
case i < breaker.RequestThreshold-1:
require.Zero(t, xEnv.Observer().FilterMessage("Circuit breaker status changed").Len())
case i == breaker.RequestThreshold-1:
require.Equal(t, 1, xEnv.Observer().FilterMessage("Circuit breaker status changed").Len())
case i > breaker.RequestThreshold-1:
expectedCount := i - (breaker.RequestThreshold - 1)
require.Equal(t, int(expectedCount), xEnv.Observer().FilterMessage("Circuit breaker open, request callback did not execute").Len())
}

}

require.Equal(t, failedTries, employeesCalls.Load())

// Wait for current bucket to be cleaned up
time.Sleep(breaker.RollingDuration + time.Millisecond*500)

// ====
// Verify a success case with messages validated from here onwards
// ====

// Sending a complete must stop the subscription
conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
err := testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { currentTime { unixTime timeStamp }}"}`),
})
require.NoError(t, err)

_, message, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)
require.JSONEq(t, timestampMessage, string(message))

err = testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{ID: "1", Type: "complete"})
require.NoError(t, err)

err = conn.SetReadDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err)

_, actualCompleteMessage, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)
require.JSONEq(t, completeMessage, string(actualCompleteMessage))

xEnv.WaitForSubscriptionCount(0, time.Second*5)
})
})

t.Run("circuit breaker metrics", func(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 20 additions & 4 deletions router-tests/error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1412,7 +1416,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1444,7 +1452,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1476,7 +1488,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down
12 changes: 10 additions & 2 deletions router-tests/panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func TestEnginePanic(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -80,7 +84,11 @@ func TestEnginePanic(t *testing.T) {
EnableSingleFlight: true,
ParseKitPoolSize: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down
Loading
Loading