Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Bugs Fixed

* Fixed excessive `GetDatabaseAccount` HTTP calls when using preferred regions, and stopped data-plane retries from trailing into the customer-supplied (default) endpoint once account topology is populated. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815).
* Partition key range cache now serves concurrent callers from a single in-flight refresh per container, and the cached routing map remains readable while a refresh is in progress. The refresh runs on a detached background `context.Background()` so a caller's cancellation no longer aborts the shared fetch for other waiters; each caller continues to honor its own context deadline. See [PR 26855](https://github.com/Azure/azure-sdk-for-go/pull/26855).
* Partition key range cache change-feed pagination is now resilient to mid-drain throttling. 429 responses are retried indefinitely (with capped linear backoff + jitter) since the service is explicitly asking the client to slow down, and the pages already accumulated are preserved instead of restarting the drain from page 1 on the next refresh. See [PR 26855](https://github.com/Azure/azure-sdk-for-go/pull/26855).

Expand Down
2 changes: 1 addition & 1 deletion sdk/data/azcosmos/cosmos_client_retry_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, i
}
}

err := p.gem.Update(req.Raw().Context(), isWriteOperation)
err := p.gem.Update(req.Raw().Context(), true)
if err != nil {
return false, err
}
Expand Down
775 changes: 775 additions & 0 deletions sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go

Large diffs are not rendered by default.

223 changes: 208 additions & 15 deletions sdk/data/azcosmos/cosmos_global_endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"

azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
Expand All @@ -24,8 +25,42 @@ type globalEndpointManager struct {
preferredLocations []string
locationCache *locationCache
refreshTimeInterval time.Duration
gemMutex sync.RWMutex
gemMutex sync.Mutex
lastUpdateTime time.Time
// lastAttemptTime records the most recent Update attempt regardless of
// outcome. shouldRefresh() honours it so a failed GetAccountProperties is
// throttled to refreshTimeInterval just like a successful one, preventing
// a failure-loop where every caller re-issues the HTTP call immediately.
lastAttemptTime time.Time
// lastUpdateErr is the error from the most recent refresh attempt. It is
// returned to callers that hit the throttle window before the GEM has
// ever been successfully populated, so a chronic bootstrap failure is
// surfaced on every request rather than silently swallowed.
lastUpdateErr error
// everPopulated is set true on the first successful refresh and never
// reset, even when invalidate() zeroes lastUpdateTime. populated() reads
// this flag rather than the timestamp, so a transient post-invalidation
// refresh failure does not stall the data plane: requests can still route
// through the existing locationCache topology while the next refresh
// attempt waits for the throttle window. Stored as atomic.Bool so the
// hot-path policy check is lock-free.
everPopulated atomic.Bool
// inflight coalesces concurrent Update callers: only the first does the
// HTTP call; the rest wait on the per-flight done channel and read
// per-flight err. Each refresh has its own *updateFlight so late waiters
// cannot accidentally read a subsequent flight's error.
inflight *updateFlight
// invalidationGen is bumped by invalidate(). The Update leader snapshots
// it before the HTTP call and, if it changed by completion, declines to
// advance lastUpdateTime/lastAttemptTime -- so an invalidation that
// happens during an in-flight refresh is not lost.
invalidationGen uint64
}

// updateFlight tracks a single in-flight refresh.
type updateFlight struct {
done chan struct{}
err error
}

func newGlobalEndpointManager(clientEndpoint string, pipeline azruntime.Pipeline, preferredLocations []string, refreshTimeInterval time.Duration, enableCrossRegionRetries bool) (*globalEndpointManager, error) {
Expand Down Expand Up @@ -59,19 +94,57 @@ func (gem *globalEndpointManager) GetReadEndpoints() ([]url.URL, error) {
}

func (gem *globalEndpointManager) MarkEndpointUnavailableForWrite(endpoint url.URL) error {
return gem.locationCache.markEndpointUnavailableForWrite(endpoint)
// markEndpointUnavailableForWrite atomically reports whether the
// endpoint was already unavailable for write in the same critical
// section that performs the mark. This eliminates the check-then-act
// race that would otherwise let concurrent markers all observe
// "wasn't unavailable" and each invalidate the GEM. A new event
// invalidates the GEM cache so the next Update(false) actually fires
// -- the first 403/WriteForbidden or network error for an endpoint
// may indicate a failover and we want to learn about new write
// regions promptly. Subsequent retries within the unavailability
// window do not invalidate.
wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForWrite(endpoint)
if err != nil {
return err
}
if !wasAlreadyUnavailable {
gem.invalidate()
}
return nil
}

func (gem *globalEndpointManager) MarkEndpointUnavailableForRead(endpoint url.URL) error {
return gem.locationCache.markEndpointUnavailableForRead(endpoint)
wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForRead(endpoint)
if err != nil {
return err
}
if !wasAlreadyUnavailable {
gem.invalidate()
}
return nil
}

// invalidate forces the next non-force Update to actually issue a refresh by
// clearing both lastUpdateTime and lastAttemptTime, and bumps
// invalidationGen so that a refresh currently in flight cannot mask the
// invalidation by writing the post-call timestamps. Used when we learn about
// a newly-unavailable endpoint and want to discover potential failover
// targets without waiting for the next refresh interval.
func (gem *globalEndpointManager) invalidate() {
gem.gemMutex.Lock()
defer gem.gemMutex.Unlock()
gem.lastUpdateTime = time.Time{}
gem.lastAttemptTime = time.Time{}
gem.invalidationGen++
}

func (gem *globalEndpointManager) GetEndpointLocation(endpoint url.URL) string {
return gem.locationCache.getLocation(endpoint)
}

func (gem *globalEndpointManager) CanUseMultipleWriteLocations() bool {
return gem.locationCache.canUseMultipleWriteLocs()
return gem.locationCache.CanUseMultipleWriteLocs()
}

func (gem *globalEndpointManager) IsEndpointUnavailable(endpoint url.URL, ops requestedOperations) bool {
Expand All @@ -82,39 +155,159 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() {
gem.locationCache.refreshStaleEndpoints()
}

// populated reports whether the GEM has ever been successfully populated.
// Unlike !lastUpdateTime.IsZero(), this remains true after invalidate()
// zeroes the timestamps, so a transient post-invalidation refresh failure
// does not stall the data plane. Lock-free atomic read keeps the policy
// hot path off gemMutex.
func (gem *globalEndpointManager) populated() bool {
return gem.everPopulated.Load()
}

// hasInflight is a test-only accessor for the in-flight refresh slot.
// Tests use it to wait until a leader has claimed the slot before firing
// follow-up calls; keeping the field access inside the GEM means tests
// don't need to grab gemMutex directly.
func (gem *globalEndpointManager) hasInflight() bool {
gem.gemMutex.Lock()
defer gem.gemMutex.Unlock()
return gem.inflight != nil
}

func (gem *globalEndpointManager) ShouldRefresh() bool {
gem.gemMutex.RLock()
defer gem.gemMutex.RUnlock()
gem.gemMutex.Lock()
defer gem.gemMutex.Unlock()
return gem.shouldRefresh()
}

func (gem *globalEndpointManager) shouldRefresh() bool {
return time.Since(gem.lastUpdateTime) > gem.refreshTimeInterval
// Honor whichever happened more recently: a successful update or an
// attempt that failed. Failures must be throttled too, otherwise a
// failing endpoint causes every caller to re-issue GetDatabaseAccount
// immediately.
last := gem.lastUpdateTime
if gem.lastAttemptTime.After(last) {
last = gem.lastAttemptTime
}
return time.Since(last) > gem.refreshTimeInterval
}

func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, resourceType resourceType, isWriteOperation, useWriteEndpoint bool) url.URL {
return gem.locationCache.resolveServiceEndpoint(locationIndex, resourceType, isWriteOperation, useWriteEndpoint)
}

// Update refreshes the GEM cache by calling GetAccountProperties when needed.
// Concurrent callers are coalesced via a single-in-flight pattern so at most
// one HTTP call is in flight per client at any time. Both successes and
// failures advance lastAttemptTime so the next refresh is throttled to
// refreshTimeInterval, preventing a failure-storm.
//
// If the GEM has never been successfully populated and the throttle is
// active, Update returns the cached error from the most recent failed
// attempt so a chronic bootstrap failure is surfaced on every request
// rather than silently swallowed. Once the GEM HAS been populated, a
// throttled Update returns nil even after a subsequent refresh failure --
// the data plane keeps using the cached topology until the throttle expires
// and a fresh refresh can run.
//
// Update respects ctx cancellation for waiters. The leader's HTTP call runs
// under context.WithoutCancel(ctx) so an unrelated caller-side cancellation
// does not poison the shared flight result for other coalesced waiters
// (GetAccountProperties applies its own 60s timeout). Waiters select between
// flight completion and their own ctx.Done() so a caller-side timeout cannot
// be exceeded by an unrelated stuck refresh.
func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) error {
gem.gemMutex.Lock()
defer gem.gemMutex.Unlock()
if !gem.shouldRefresh() && !forceRefresh {
return nil
// Throttled. Surface the cached error only if we have NEVER
// successfully populated the GEM -- otherwise the data plane has
// a valid cached topology and should continue working until the
// next refresh attempt succeeds. The cached error is shared across
// force=true and force=false callers: both want to surface
// "bootstrap is broken" and there's no caller-visible distinction.
var cached error
if !gem.everPopulated.Load() {
cached = gem.lastUpdateErr
}
gem.gemMutex.Unlock()
return cached
}
if gem.inflight != nil {
// Another goroutine is performing a refresh. Wait for it and share
// its result rather than spawning a duplicate HTTP call. The result
// lives on the per-flight struct so subsequent flights cannot
// overwrite it. Honour the waiter's ctx so a caller-side timeout
// is not extended by the leader's HTTP call duration.
flight := gem.inflight
gem.gemMutex.Unlock()
select {
case <-flight.done:
return flight.err
case <-ctx.Done():
return ctx.Err()
}
}
// We are the leader. Publish the inflight flight and snapshot the
// invalidation generation, then release the lock while we perform the
// HTTP call so ShouldRefresh and other non-Update paths don't block on
// a network round-trip.
flight := &updateFlight{done: make(chan struct{})}
gem.inflight = flight
genAtStart := gem.invalidationGen
gem.gemMutex.Unlock()

// Panic-safe cleanup: if refreshOnce (or anything it transitively calls
// -- the pipeline, JSON unmarshal, locationCache.update) panics, we
// MUST still clear gem.inflight and close flight.done, otherwise every
// subsequent Update caller blocks forever on <-flight.done. We capture
// any panic, record it as the flight error, and re-panic after cleanup.
var err error
defer func() {
r := recover()
gem.gemMutex.Lock()
if r != nil && err == nil {
err = fmt.Errorf("panic in GEM refresh: %v", r)
}
flight.err = err
gem.lastUpdateErr = err
if gem.invalidationGen == genAtStart {
// No invalidation occurred during the flight, so commit the
// timestamps and let the throttle take effect.
gem.lastAttemptTime = time.Now()
if err == nil {
gem.lastUpdateTime = gem.lastAttemptTime
gem.everPopulated.Store(true)
}
}
// If invalidationGen changed, leave the timestamps untouched so
// the next caller observes shouldRefresh()==true and performs a
// fresh refresh that reflects the post-invalidation state.
gem.inflight = nil
gem.gemMutex.Unlock()
close(flight.done)
if r != nil {
panic(r)
}
}()
err = gem.refreshOnce(context.WithoutCancel(ctx))
return err
}

// refreshOnce performs the actual GetAccountProperties HTTP call and
// propagates the result into the location cache. It must not hold gemMutex.
func (gem *globalEndpointManager) refreshOnce(ctx context.Context) error {
accountProperties, err := gem.GetAccountProperties(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve account properties: %v", err)
return fmt.Errorf("failed to retrieve account properties: %w", err)
}
err = gem.locationCache.update(
if err := gem.locationCache.update(
accountProperties.WriteRegions,
accountProperties.ReadRegions,
gem.preferredLocations,
&accountProperties.EnableMultipleWriteLocations)
if err != nil {
return fmt.Errorf("failed to update location cache: %v", err)
&accountProperties.EnableMultipleWriteLocations,
); err != nil {
return fmt.Errorf("failed to update location cache: %w", err)
}
gem.lastUpdateTime = time.Now()
return nil
}

Expand Down
63 changes: 52 additions & 11 deletions sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,69 @@ package azcosmos
import (
"context"
"net/http"
"sync"
"runtime/debug"
"sync/atomic"

azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

type globalEndpointManagerPolicy struct {
gem *globalEndpointManager
once sync.Once
gem *globalEndpointManager
// asyncRefreshPending gates the spawn of the async refresh goroutine.
// Without this gate every request that observes ShouldRefresh()==true
// (potentially thousands during a burst that arrives right as the
// throttle expires) would spawn its own goroutine, each of which would
// then queue as a waiter inside gem.Update. The singleflight in Update
// collapses them to one HTTP call, but the goroutine + select overhead
// is wasted. CAS this to true before spawning; the goroutine clears it
// on exit.
asyncRefreshPending atomic.Bool
}

func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, error) {
// Synchronous bootstrap: while the GEM has never been successfully
// populated, every request synchronously calls Update. Concurrent
// callers are coalesced inside gem.Update via the single-in-flight
// pattern, so at most one HTTP call is in flight. If the call fails
// the throttle in gem.Update (lastAttemptTime) ensures subsequent
// bootstrap retries respect refreshTimeInterval, preventing a
// failure storm.
var err error
p.once.Do(func() {
if !p.gem.populated() {
// Use the same context, but without the cancellation signal.
// We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request.
err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), true)
})
if p.gem.ShouldRefresh() {
// We DO want to preserve things like context values, but the GEM
// update needs to complete fully, even if the user cancels the
// triggering request.
err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false)
}
if p.gem.ShouldRefresh() && p.asyncRefreshPending.CompareAndSwap(false, true) {
// Capture the context before launching the goroutine so we do not
// depend on req's lifetime after the policy returns.
refreshCtx := context.WithoutCancel(req.Raw().Context())
go func() {
// Use the same context, but without the cancellation signal.
// We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request.
_ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false)
defer p.asyncRefreshPending.Store(false)
// gem.Update's panic-safe defer re-panics after cleanup. We
// recover here so a panic in the GEM pipeline does not bring
// down the host process via this detached goroutine. The
// recovered value is logged (rather than silently dropped) so
// production crashes remain triageable.
defer func() {
if r := recover(); r != nil {
log.Writef(azlog.EventResponse,
"panic in azcosmos GEM async refresh: %v\n%s",
r, debug.Stack())
}
}()
// Log refresh failures so a chronically failing GEM is
// observable. Without this, post-bootstrap topology drift is
// silent: the data plane keeps routing to the cached topology
// and callers see no signal until requests start failing.
if err := p.gem.Update(refreshCtx, false); err != nil {
log.Writef(azlog.EventResponse,
"azcosmos GEM async refresh failed: %v", err)
}
}()
}
if p.gem.CanUseMultipleWriteLocations() {
Expand Down
Loading
Loading