Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d86e5cc
feat: retry settings config per subgraph
SkArchon Sep 1, 2025
39a200c
fix: review comments
SkArchon Sep 1, 2025
a16a8f9
fix: add test
SkArchon Sep 1, 2025
a8a9cc2
fix: tests
SkArchon Sep 1, 2025
79cdb75
fix: add default options to subgraphs only when required
SkArchon Sep 1, 2025
3344631
fix: review comments
SkArchon Sep 1, 2025
e3cebe5
fix: cleanup formatting
SkArchon Sep 2, 2025
1c9baf2
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 2, 2025
c6c224b
fix: return error
SkArchon Sep 3, 2025
dae502d
fix: review comments
SkArchon Sep 3, 2025
f4f09e1
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 3, 2025
3c7647c
fix: tests
SkArchon Sep 3, 2025
c7b4040
fix: review comments
SkArchon Sep 3, 2025
0ddb674
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 3, 2025
24fbca2
Merge remote-tracking branch 'origin/main' into milinda/retry-per-sub…
SkArchon Sep 16, 2025
a976f16
fix: merge resolving
SkArchon Sep 16, 2025
f4090fa
fix: upgrade request
SkArchon Sep 16, 2025
bceee7e
fix: feature flags
SkArchon Sep 17, 2025
98dde91
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Sep 17, 2025
3131bb7
fix: linting
SkArchon Sep 17, 2025
f353169
fix: linting
SkArchon Sep 17, 2025
72c8268
fix: ci tests
SkArchon Sep 17, 2025
db1964c
fix: increase duration
SkArchon Sep 17, 2025
9826e24
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Oct 16, 2025
8cc3911
Merge remote-tracking branch 'origin/main' into milinda/retry-per-sub…
SkArchon Apr 7, 2026
f37ac89
fix: gofmt
SkArchon Apr 7, 2026
9ec4d8c
Merge branch 'main' into milinda/retry-per-subgraphs
SkArchon Apr 7, 2026
a4d460b
fix: circuit breaker tests
SkArchon Apr 7, 2026
be2b7c9
fix: tests
SkArchon Apr 7, 2026
dfb90dd
fix: tests
SkArchon Apr 7, 2026
4f52652
fix: changes
SkArchon Apr 21, 2026
65b1db8
fix: retry updates
SkArchon Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions router-tests/observability/structured_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ func TestFlakyAccessLogs(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{All: config.GlobalSubgraphRequestRule{BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false}}})),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
Expand Down Expand Up @@ -864,7 +864,7 @@ func TestFlakyAccessLogs(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{All: config.GlobalSubgraphRequestRule{BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false}}})),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
Expand Down Expand Up @@ -996,7 +996,7 @@ func TestFlakyAccessLogs(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{All: config.GlobalSubgraphRequestRule{BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false}}})),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func TestFlakyAccessLogs(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{All: config.GlobalSubgraphRequestRule{BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false}}})),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
Expand Down Expand Up @@ -2476,7 +2476,7 @@ func TestFlakyAccessLogs(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{All: config.GlobalSubgraphRequestRule{BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false}}})),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
Expand Down
143 changes: 143 additions & 0 deletions router-tests/security/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/wundergraph/cosmo/router-tests/testutils"

"context"
"github.com/gorilla/websocket"
"net/http"
"sort"
"sync/atomic"
Expand Down Expand Up @@ -625,6 +626,148 @@ func TestCircuitBreaker(t *testing.T) {
})
})

t.Run("verify circuit breaker tripping on upgrade requests", func(t *testing.T) {
t.Parallel()

const failedTries int64 = 3

const timestampMessage = `{"type":"next","id":"1","payload":{"data":{"currentTime":{"unixTime":1,"timeStamp":"2021-09-01T12:00:00Z"}}}}`
const completeMessage = `{"type":"complete","id":"1"}`
const defaultErrorMessage = `{"id":"1","type":"error","payload":[{"message":"Internal server error"}]}`

breaker := getCircuitBreakerWithDefaults()
breaker.RequestThreshold = 3
breaker.ErrorThresholdPercentage = 100

breaker.NumBuckets = 1
breaker.RollingDuration = 5000 * time.Millisecond
trafficConfig := getTrafficConfigWithTimeout(breaker, 1*time.Second)

employeesCalls := atomic.Int64{}

testenv.Run(t, &testenv.Config{
ModifyEngineExecutionConfiguration: func(engineExecutionConfiguration *config.EngineExecutionConfiguration) {
engineExecutionConfiguration.WebSocketClientReadTimeout = time.Millisecond * 2000
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.DebugLevel,
},
RouterOptions: []core.Option{
core.WithSubgraphCircuitBreakerOptions(core.NewSubgraphCircuitBreakerOptions(trafficConfig)),
core.WithSubgraphTransportOptions(core.NewSubgraphTransportOptions(trafficConfig)),
},
Subgraphs: testenv.SubgraphsConfig{
Employees: testenv.SubgraphConfig{
Middleware: func(_ http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
employeesCalls.Add(1)
if employeesCalls.Load() <= failedTries {
simulateConnectionFailureOnClose(w)
return
}

upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
Subprotocols: []string{"graphql-transport-ws"},
}
conn, err := upgrader.Upgrade(w, r, nil)
require.NoError(t, err)
defer func() {
_ = conn.Close()
}()

// Read connection_init
_, _, err = testenv.WSReadMessage(t, conn)
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(`{"type":"connection_ack"}`))
require.NoError(t, err)

// Read subscribe message before sending data
_, _, err = testenv.WSReadMessage(t, conn)
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(timestampMessage))
require.NoError(t, err)

err = testenv.WSWriteMessage(t, conn, websocket.TextMessage, []byte(completeMessage))
require.NoError(t, err)
})
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
for i := range failedTries + 2 {
conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
err := testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { currentTime { unixTime timeStamp }}"}`),
})
require.NoError(t, err)

_, message, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)

require.JSONEq(t, defaultErrorMessage, string(message))
require.NoError(t, conn.Close())

switch {
case i < breaker.RequestThreshold-1:
require.Zero(t, xEnv.Observer().FilterMessage("Circuit breaker status changed").Len())
case i == breaker.RequestThreshold-1:
require.Equal(t, 1, xEnv.Observer().FilterMessage("Circuit breaker status changed").Len())
case i > breaker.RequestThreshold-1:
expectedCount := i - (breaker.RequestThreshold - 1)
require.Equal(t, int(expectedCount), xEnv.Observer().FilterMessage("Circuit breaker open, request callback did not execute").Len())
}

}

require.Equal(t, failedTries, employeesCalls.Load())

// Ensure all previous subscriptions are fully cleaned up before
// waiting for the circuit to reset, to prevent leftover subscription
// cleanup from interfering with the half-open circuit state.
xEnv.WaitForSubscriptionCount(0, time.Second*5)

// Wait for current bucket to be cleaned up
time.Sleep(breaker.RollingDuration*3 + time.Millisecond*1000)

// ====
// Verify a success case with messages validated from here onwards
// ====

conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
err := testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { currentTime { unixTime timeStamp }}"}`),
})
require.NoError(t, err)

_, message, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)

require.JSONEq(t, timestampMessage, string(message))

err = testenv.WSWriteJSON(t, conn, &testenv.WebSocketMessage{ID: "1", Type: "complete"})
require.NoError(t, err)

err = conn.SetReadDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err)

_, actualCompleteMessage, err := testenv.WSReadMessage(t, conn)
require.NoError(t, err)
require.JSONEq(t, completeMessage, string(actualCompleteMessage))

xEnv.WaitForSubscriptionCount(0, time.Second*5)
})
})

t.Run("circuit breaker metrics", func(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 20 additions & 4 deletions router-tests/security/error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1595,7 +1599,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1627,7 +1635,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -1659,7 +1671,11 @@ func TestErrorPropagation(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down
12 changes: 10 additions & 2 deletions router-tests/security/panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func TestEnginePanic(t *testing.T) {
EnableSingleFlight: true,
MaxConcurrentResolvers: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down Expand Up @@ -80,7 +84,11 @@ func TestEnginePanic(t *testing.T) {
EnableSingleFlight: true,
ParseKitPoolSize: 1,
}),
core.WithSubgraphRetryOptions(false, "", 0, 0, 0, "", nil),
core.WithSubgraphRetryOptions(core.NewSubgraphRetryOptions(config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
BackoffJitterRetry: config.BackoffJitterRetry{Enabled: false},
},
})),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Expand Down
Loading
Loading