Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
8c4faa8
azcosmos: don't gate cross-region connection-error failover on metada…
tvaron3 May 26, 2026
97fad46
azcosmos: also fix retryCount-on-failover and URL-key mismatch in una…
tvaron3 May 26, 2026
e528b1c
azcosmos: unblock cross-region failover from synchronous gem.Update
tvaron3 May 27, 2026
09419e6
azcosmos: shorten CHANGELOG entry for PR 26889
tvaron3 May 27, 2026
0fabe5f
azcosmos: add CHANGELOG entry for PR 26889
tvaron3 May 27, 2026
4c19925
azcosmos: clarify test comment to match injected error (PR review fee…
tvaron3 May 27, 2026
7920f43
azcosmos: address deep-review findings on the cross-region failover path
tvaron3 May 27, 2026
b934b51
azcosmos: add azlog, Writef, Retriable, unrecovered to cspell dictionary
tvaron3 May 28, 2026
9777c3c
Revert "azcosmos: add azlog, Writef, Retriable, unrecovered to cspell…
tvaron3 May 28, 2026
32c7cc3
azcosmos: scope cspell allowlist to sdk/data/azcosmos/**
tvaron3 May 28, 2026
ae7ea50
Revert "azcosmos: scope cspell allowlist to sdk/data/azcosmos/**"
tvaron3 May 28, 2026
c03eaf0
azcosmos: add cSpell:ignore directive for retry-policy-only words
tvaron3 May 28, 2026
7758fb0
azcosmos: add cSpell:ignore for test-pipeline name strings
tvaron3 May 28, 2026
5b41e4c
azcosmos: extend cSpell:ignore to remaining flagged test files
tvaron3 May 28, 2026
2dfd8f1
azcosmos: add cSpell:ignore to remaining files touched by the PR
tvaron3 May 28, 2026
766b5d8
azcosmos: silence remaining cspell flags (verified locally)
tvaron3 May 28, 2026
0125e44
azcosmos: gofmt cosmos_dbaccount_refresh_test.go
tvaron3 May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Release History

<!-- cSpell:ignore documentdb unmarshalling -->

## 1.5.0-beta.7 (Unreleased)

### Features Added
Expand All @@ -8,6 +10,7 @@

### Bugs Fixed

* 403/`WriteForbidden` retries refresh the global endpoint manager fire-and-forget (CAS-gated) instead of blocking on a synchronous `gem.Update`. See [PR 26889](https://github.com/Azure/azure-sdk-for-go/pull/26889).
* Connection-error retry policy now attempts up to 3 retries against the current region before failing over, and performs at most one cross-region failover per call. Cross-region failover for writes only occurs when the error proves the request never reached the service (DNS, dial, TLS handshake, `ECONNREFUSED`, etc.); writes on ambiguous transport failures (e.g. `ECONNRESET`, `EOF`, transport-level timeouts) no longer fail over to another region, avoiding potential duplicate writes. Reads still fail over for any transport error. Caller-set context deadlines or cancellations short-circuit the policy without consuming the caller's budget with retries. See [PR 26858](https://github.com/Azure/azure-sdk-for-go/pull/26858).
* HTTP `408 Request Timeout` responses are now handled by the Cosmos client retry policy: reads are retried exactly once against another region, and writes are returned to the caller immediately to avoid potential duplicates. See [PR 26858](https://github.com/Azure/azure-sdk-for-go/pull/26858).
* Fixed excessive `GetDatabaseAccount` HTTP calls when using preferred regions, and stopped data-plane retries from trailing into the customer-supplied (default) endpoint once account topology is populated. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815).
Expand Down
226 changes: 179 additions & 47 deletions sdk/data/azcosmos/cosmos_client_retry_policy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

// cSpell:ignore azlog Writef Retriable unrecovered

package azcosmos

import (
Expand All @@ -11,16 +13,99 @@ import (
"io"
"net"
"net/http"
"runtime/debug"
"sync/atomic"
"syscall"
"time"

azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/errorinfo"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

type clientRetryPolicy struct {
gem *globalEndpointManager
// asyncRefreshState tracks the in-flight goroutine spawned by
// asyncForceRefreshGEM (Idle/Pending/Failed). See its doc comment.
asyncRefreshState atomic.Int32
// lastForcedRefreshUnixNano is the completion time of the most
// recent asyncForceRefreshGEM. Read by staleForcedRefresh to
// rate-limit repeat refreshes against the same endpoint.
lastForcedRefreshUnixNano atomic.Int64
}

const (
asyncRefreshIdle int32 = 0
asyncRefreshPending int32 = 1
asyncRefreshFailed int32 = 2

// forcedRefreshMinInterval rate-limits repeat forced refreshes
// against an already-unavailable endpoint. Must be >=
// defaultBackoff*time.Second so a tight 403 loop cannot bypass it.
forcedRefreshMinInterval = 2 * time.Second
)

// asyncForceRefreshGEM kicks off a forced GEM topology refresh in a
// detached goroutine. The refresh must never block a data-plane retry:
// during a regional outage the global FQDN often resolves to the same
// regional FE pool we just marked unavailable, so a synchronous Update
// can stall and prevent failover.
//
// asyncRefreshState (CAS-gated) caps in-flight refreshes at one per
// policy. We run on context.Background() so a near-expired caller
// deadline cannot abort the refresh. Panics from gem.Update are
// recovered + logged but NOT re-panicked (an unrecovered panic in a
// detached goroutine terminates the process).
//
// Returns true if a refresh was actually spawned.
func (p *clientRetryPolicy) asyncForceRefreshGEM() bool {
for {
state := p.asyncRefreshState.Load()
if state == asyncRefreshPending {
return false
}
if p.asyncRefreshState.CompareAndSwap(state, asyncRefreshPending) {
break
}
}
go func() {
err := error(nil)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in azcosmos retry-policy async GEM refresh: %v", r)
log.Writef(azlog.EventResponse, "%v\n%s", err, debug.Stack())
}
// Record completion time BEFORE flipping state so callers
// that observe Idle also see the freshly-updated timestamp.
p.lastForcedRefreshUnixNano.Store(time.Now().UnixNano())
if err != nil {
p.asyncRefreshState.Store(asyncRefreshFailed)
} else {
p.asyncRefreshState.Store(asyncRefreshIdle)
}
}()
err = p.gem.Update(context.Background(), true)
if err != nil {
log.Writef(azlog.EventResponse,
"azcosmos retry-policy async GEM refresh failed: %v", err)
}
}()
return true
}

// staleForcedRefresh reports whether the rate-limit window has
// elapsed since the last completed asyncForceRefreshGEM (or no refresh
// has run yet). Used to permit follow-up refreshes for repeat 403s
// against an already-unavailable endpoint -- critical for single-master
// writes, which cannot reroute locally.
func (p *clientRetryPolicy) staleForcedRefresh() bool {
last := p.lastForcedRefreshUnixNano.Load()
if last == 0 {
return true
}
return time.Since(time.Unix(0, last)) >= forcedRefreshMinInterval
}

// Retry context for the request
Expand All @@ -43,6 +128,11 @@ type retryContext struct {
// single cross-region retry for an HTTP 408. Only reads are retried
// on 408; writes are returned to the caller immediately.
requestTimeoutRetryDone bool
// resolveFromHead is a one-shot signal to the outer Do loop to use
// locationIndex 0 instead of retryCount. Set by retry paths that
// demote-to-tail (MarkEndpointUnavailable* moves the bad endpoint
// to the tail of the route list).
resolveFromHead bool
}

const maxRetryCount = 120
Expand Down Expand Up @@ -97,7 +187,13 @@ func (p *clientRetryPolicy) Do(req *policy.Request) (*http.Response, error) {
for {
// Update the retry context with the latest retry values
req.SetOperationValue(retryContext)
resolvedEndpoint := p.gem.ResolveServiceEndpoint(retryContext.retryCount, o.resourceType, o.isWriteOperation, retryContext.useWriteEndpoint)
// Consume the one-shot resolveFromHead override.
locationIndex := retryContext.retryCount
if retryContext.resolveFromHead {
locationIndex = 0
retryContext.resolveFromHead = false
}
resolvedEndpoint := p.gem.ResolveServiceEndpoint(locationIndex, o.resourceType, o.isWriteOperation, retryContext.useWriteEndpoint)
regionName := p.gem.GetEndpointLocation(resolvedEndpoint)
req.Raw().Host = resolvedEndpoint.Host
req.Raw().URL.Host = resolvedEndpoint.Host
Expand Down Expand Up @@ -264,50 +360,43 @@ func (p *clientRetryPolicy) attemptRetryOnNetworkError(req *policy.Request, kind
// without producing a different endpoint.
canCrossRegionWrite := !isWriteOperation || p.gem.CanUseMultipleWriteLocations()
if isWriteOperation && (kind != connectionErrorNotSent || !canCrossRegionWrite) {
// Ambiguous failure, or single-master write: we cannot safely
// retry on another region. Mark the endpoint unavailable for
// reads so concurrent requests learn about the outage, but do
// not mark it unavailable for writes on a single-master
// account (we have nowhere else to send writes).
//
// Intentionally no gem.Update(ctx, true) here: as of PR #26815
// MarkEndpointUnavailable* invalidates the GEM cache once per
// newly-unavailable endpoint, so the *next* caller's
// Update(false) will issue a refresh on its own. We skip the
// synchronous refresh because connection errors do not
// indicate that account topology has changed — they just say
// "this region is unhealthy right now." Forcing a refresh on
// every give-up under a regional outage would amplify the
// outage by piling GetDatabaseAccount calls on the metadata
// endpoint precisely when we want to be most responsive.
if err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL); err != nil {
// Ambiguous failure or single-master write: cannot safely retry
// on another region. Mark unavailable for reads (concurrent
// readers learn), but not for writes on single-master (nowhere
// else to send writes). No forced gem.Update: the invalidate()
// inside MarkEndpointUnavailable* will arm the next non-force
// Update on its own, and a connection error is not a topology
// change.
if _, err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL); err != nil {
return false, err
}
if canCrossRegionWrite {
if err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL); err != nil {
if _, err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL); err != nil {
return false, err
}
}
return false, nil
}

if err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL); err != nil {
if _, err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL); err != nil {
return false, err
}
if isWriteOperation {
if err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL); err != nil {
if _, err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL); err != nil {
return false, err
}
}
// Force a refresh so the new unavailability is reflected in
// readEndpoints / writeEndpoints for both this request and any
// concurrent requests racing through resolveServiceEndpoint.
if err := p.gem.Update(req.Raw().Context(), true); err != nil {
return false, err
}
// No forced gem.Update: gating the failover on it would surface a
// metadata-endpoint timeout (the global FQDN often resolves to the
// same regional FE pool we just marked unavailable) and skip the
// cross-region retry. invalidate() inside MarkEndpointUnavailable*
// arms the next non-force Update for real topology changes.

retryContext.sameRegionRetryCount = 0
Comment thread
tvaron3 marked this conversation as resolved.
retryContext.retryCount += 1
// Demote-to-tail leaves the bad endpoint at index 1+; force the
// next resolve to use index 0 instead of the (possibly bumped)
// retryCount, otherwise we'd route right back to the demoted slot.
retryContext.resolveFromHead = true
retryContext.crossRegionFailoverDone = true
if sleepErr := sleepWithContext(req.Raw().Context(), defaultBackoff*time.Second); sleepErr != nil {
return false, fmt.Errorf("%w: underlying transport error: %v", sleepErr, transportErr)
Expand All @@ -319,34 +408,85 @@ func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, i
if (retryContext.retryCount > maxRetryCount) || !p.gem.locationCache.enableCrossRegionRetries {
return false, nil
}
var wasAlreadyUnavailable bool
var err error
if isWriteOperation {
err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
wasAlreadyUnavailable, err = p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
if err != nil {
return false, err
}
} else {
err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
wasAlreadyUnavailable, err = p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
if err != nil {
return false, err
}
}

err := p.gem.Update(req.Raw().Context(), true)
if err != nil {
return false, err
}
// Kick off a forced async refresh on:
// - a NEW unavailability event for this endpoint (first
// transition; we always want fresh topology after a brand-new
// mark), OR
// - a repeat mark when no refresh is currently in flight AND
// the last completed forced refresh is older than
// forcedRefreshMinInterval. This single condition covers
// both recovery from a successful-but-stale prior refresh
// (single-master writes can't reroute locally) and recovery
// from a failed prior refresh (metadata endpoint was
// transiently unhealthy) without storming GetDatabaseAccount
// when the metadata endpoint is sustained-unhealthy.
//
// MarkEndpointUnavailable* already calls invalidate() on the first
// transition, so the next non-force Update will refresh anyway --
// but for single-master writes the local route list cannot reroute
// around the bad write endpoint, so without these additional
// forced refreshes the client could be stuck on the failed write
// region for refreshTimeInterval (default 5 min).
//
// Fire-and-forget: we do NOT block the retry on its outcome.
// MarkEndpointUnavailable* has already invalidated the GEM cache
// and demoted the bad endpoint locally, so the next
// ResolveServiceEndpoint will pick the failover region (in
// multi-region scenarios) whether or not the metadata refresh
// succeeds. Blocking here would surface a transient metadata
// failure to the caller and skip the very cross-region retry this
// function is supposed to perform.
state := p.asyncRefreshState.Load()
shouldForceRefresh := !wasAlreadyUnavailable ||
(state != asyncRefreshPending && p.staleForcedRefresh())
if shouldForceRefresh {
p.asyncForceRefreshGEM()
}

// Force the next resolve to use locationIndex 0. Without this, the
// outer Do() loop bumps retryCount += 1 after we return true, which
// for a two-region account turns readEndpoints[1 % 2] back into the
// just-marked unhealthy endpoint that MarkEndpointUnavailable*
// demoted to the tail. resolveFromHead is a one-shot consumed by
// the outer loop's ResolveServiceEndpoint call.
retryContext.resolveFromHead = true

time.Sleep(defaultBackoff * time.Second)
if sleepErr := sleepWithContext(req.Raw().Context(), defaultBackoff*time.Second); sleepErr != nil {
return false, sleepErr
}
return true, nil
}

func (p *clientRetryPolicy) attemptRetryOnSessionUnavailable(isWriteOperation bool, retryContext *retryContext) bool {
if p.gem.CanUseMultipleWriteLocations() {
endpoints := p.gem.locationCache.locationInfo.availReadLocations
// Snapshot multi-write capability AND the relevant slice length
// under a single RLock. The async refresh paths (in this file and
// in globalEndpointManagerPolicy) can call locationCache.update
// concurrently, which rewrites enableMultipleWriteLocations and
// availRead/WriteLocations under mapMutex.Lock(). Sampling these
// across two separate lock acquisitions can yield a mixed snapshot
// (multi-write decision from before a refresh + slice length from
// after it, or vice versa), causing the wrong branch to be taken.
multiWrite, readN, writeN := p.gem.locationCache.sessionRetrySnapshot()
if multiWrite {
n := readN
if isWriteOperation {
endpoints = p.gem.locationCache.locationInfo.availWriteLocations
n = writeN
}
if retryContext.sessionRetryCount >= len(endpoints) {
if retryContext.sessionRetryCount >= n {
return false
}
} else {
Expand Down Expand Up @@ -387,14 +527,6 @@ func (p *clientRetryPolicy) attemptRetryOnRequestTimeout(req *policy.Request, is
return false, nil
}

if err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL); err != nil {
return false, err
}
// Force a refresh so the unavailability is reflected in
// readEndpoints for this and concurrent requests.
if err := p.gem.Update(req.Raw().Context(), true); err != nil {
return false, err
}
retryContext.requestTimeoutRetryDone = true
// Preserve the caller's cancellation cause if their context fires
// during the backoff so errors.Is(returned, context.DeadlineExceeded)
Expand Down
Loading
Loading