From 462d99b587f55ddc1017be8b96af86f014706b1a Mon Sep 17 00:00:00 2001 From: Tomas Varon Date: Tue, 19 May 2026 12:47:38 -0700 Subject: [PATCH 1/8] Cosmos: fix excess GetDatabaseAccount calls and default-endpoint fallback (#25468) Fixes issue #25468 (excess GetDatabaseAccount HTTP calls with preferred regions) plus the related default-endpoint fallback in data-plane routing. Root causes addressed: * GEM Update only advanced lastUpdateTime on success, so any failed refresh caused every subsequent caller to re-issue the HTTP call immediately (no 5-min throttle on failures). * globalEndpointManagerPolicy.Do spawned a goroutine per request whenever ShouldRefresh() was true, with no in-flight coalescing. * attemptRetryOnEndpointFailure passed isWriteOperation as forceRefresh, so every retried write 403 force-refreshed GEM regardless of the refresh interval. * A failed initial sync.Once bootstrap latched done=true, pinning the client into the async-refresh herd path forever. * locationCache.readEndpoints/writeEndpoints could self-deadlock by acquiring RLock and then calling update() which takes Lock(). * getPrefAvailableEndpointsLocked appended the customer-supplied default endpoint as a trailing fallback in every route list, so retry traversal eventually issued data-plane requests there. * resolveServiceEndpoint fell back to the default endpoint whenever enableCrossRegionRetries was false even with non-empty availWriteLocations. Changes: * globalEndpointManager: add lastAttemptTime, per-flight singleflight, invalidationGen, lastUpdateErr, populated(), invalidate(); MarkEndpoint Unavailable* now invalidates once per newly-unavailable endpoint. * globalEndpointManagerPolicy: drop sync.Once; bootstrap is gated on populated() and coalesced via gem.Update's singleflight. * clientRetryPolicy: write retries pass forceRefresh=false. * locationCache: split Locked variants; readEndpoints/writeEndpoints drop the RLock before calling update; update() chooses regional write/read fallbacks; resolveServiceEndpoint always picks a regional endpoint when availWriteLocations is non-empty (cross-region-retries flag now only gates retry walking); getPrefAvailableEndpointsLocked no longer appends a trailing fallback. * Only remaining data-plane -> default-endpoint route is the degenerate 'zero write regions on a write request' case. Tests: * cosmos_dbaccount_refresh_test.go (new): 12 regression tests covering throttle-on-failure, single-flight coalescing, write-retry single refresh, in-flight invalidation, cached bootstrap error, deadlock-free readEndpoints, default-endpoint elimination across resource types and configurations, plus 500-concurrency soak tests on healthy and failing GEMs. * cosmos_location_cache_test.go: update 4 expectations to drop trailing default-endpoint entries from route lists. All tests pass with -race. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/data/azcosmos/CHANGELOG.md | 3 + .../azcosmos/cosmos_client_retry_policy.go | 8 +- .../azcosmos/cosmos_dbaccount_refresh_test.go | 628 ++++++++++++++++++ .../cosmos_global_endpoint_manager.go | 163 ++++- .../cosmos_global_endpoint_manager_policy.go | 26 +- sdk/data/azcosmos/cosmos_location_cache.go | 123 +++- .../azcosmos/cosmos_location_cache_test.go | 23 +- 7 files changed, 920 insertions(+), 54 deletions(-) create mode 100644 sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 9d0dc879ba76..66a2ce4c43f0 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -14,6 +14,9 @@ ### Bugs Fixed +* Fixed excessive `GetDatabaseAccount` (region topology) HTTP calls when using preferred regions. Previously a failed refresh did not advance the throttle, concurrent callers each spawned a goroutine that issued its own HTTP call, write retries on `403/WriteForbidden` force-refreshed the global endpoint manager on every retry attempt, and a failed bootstrap could pin the client into a permanent refresh storm. Concurrent refreshes are now coalesced via a single in-flight pattern, failures honour the same throttle as successes, write retries refresh at most once per newly-unavailable endpoint, and a chronic bootstrap failure surfaces the cached error rather than retrying on every request. Also fixed a self-deadlock in `locationCache.readEndpoints`/`writeEndpoints` on the stale-endpoints refresh path. See [issue 25468](https://github.com/Azure/azure-sdk-for-go/issues/25468). +* Data-plane requests no longer route to the customer-supplied (default) endpoint as a fallback once the account topology is populated. Previously every route list trailed into the default endpoint, so retry traversal eventually issued data-plane requests there even when full regional metadata was available; `enableCrossRegionRetries=false` also caused single-master writes to fall back to the default endpoint instead of pinning to a regional one. The only remaining data-plane code path that targets the default endpoint is the degenerate case of an account advertising zero write regions on a write request. See [issue 25468](https://github.com/Azure/azure-sdk-for-go/issues/25468). + ### Other Changes ## 1.5.0-beta.0 (2025-06-09) diff --git a/sdk/data/azcosmos/cosmos_client_retry_policy.go b/sdk/data/azcosmos/cosmos_client_retry_policy.go index 772775cf32d4..76637336da07 100644 --- a/sdk/data/azcosmos/cosmos_client_retry_policy.go +++ b/sdk/data/azcosmos/cosmos_client_retry_policy.go @@ -142,7 +142,13 @@ func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, i } } - err := p.gem.Update(req.Raw().Context(), isWriteOperation) + // Pass forceRefresh=false. The MarkEndpointUnavailable* calls above + // already invalidate the GEM cache the first time an endpoint becomes + // unavailable, so the next Update will actually issue a refresh. On + // subsequent retries within the unavailability window we honour the + // 5-min throttle instead of forcing a fresh GetDatabaseAccount call per + // attempt (https://github.com/Azure/azure-sdk-for-go/issues/25468). + err := p.gem.Update(req.Raw().Context(), false) if err != nil { return false, err } diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go new file mode 100644 index 000000000000..944496c6ab45 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -0,0 +1,628 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +// Regression tests for the fixes applied for +// https://github.com/Azure/azure-sdk-for-go/issues/25468 -- excess +// GetDatabaseAccount calls observed with preferred regions configured. +// +// These tests cover: +// F1: failed GEM Update is throttled to refreshTimeInterval (lastAttemptTime) +// F2: concurrent Update callers are coalesced into a single HTTP call +// F3: write-retry on 403 issues at most one GEM call per logical request +// F4: a failed initial bootstrap Update is retried on the next request +// F5: locationCache.readEndpoints does not deadlock on the stale+unavailable path +// Soak: under sustained mixed load, total GEM calls respect refreshTimeInterval + +package azcosmos + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/stretchr/testify/require" +) + +// countingTransport counts how many requests reach the test endpoint and +// optionally returns a canned status / error so we can simulate +// GetDatabaseAccount behaviour. body, when set, is returned as the response +// body. +type countingTransport struct { + count atomic.Int64 + status int + body []byte + respErr error + delay time.Duration +} + +func (c *countingTransport) Do(req *http.Request) (*http.Response, error) { + c.count.Add(1) + if c.delay > 0 { + time.Sleep(c.delay) + } + if c.respErr != nil { + return nil, c.respErr + } + resp := &http.Response{ + StatusCode: c.status, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: jsonBody(c.body), + Request: req, + } + return resp, nil +} + +func jsonBody(b []byte) io.ReadCloser { + return &jsonReadCloser{b: b} +} + +type jsonReadCloser struct { + b []byte + i int +} + +func (r *jsonReadCloser) Read(p []byte) (int, error) { + if r.i >= len(r.b) { + return 0, io.EOF + } + n := copy(p, r.b[r.i:]) + r.i += n + return n, nil +} +func (r *jsonReadCloser) Close() error { return nil } + +func newGEMWithTransport(t *testing.T, preferred []string, transport policy.Transporter, refresh time.Duration) *globalEndpointManager { + t.Helper() + pl := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + gem, err := newGlobalEndpointManager("https://fake.documents.azure.com:443/", pl, preferred, refresh, true) + require.NoError(t, err) + return gem +} + +// ---------------------------------------------------------------------------- +// F1: a failed Update must throttle subsequent refresh attempts to +// refreshTimeInterval. The bug had every subsequent caller re-issuing +// GetDatabaseAccount because lastUpdateTime was only set on success. +// We still surface the cached error to callers (see F3b) but the HTTP call +// must not repeat within the throttle window. +// ---------------------------------------------------------------------------- +func TestFix1_FailedUpdateIsThrottled(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // First Update: lastAttemptTime is zero -> shouldRefresh()==true -> HTTP call -> fails. + err := gem.Update(context.Background(), false) + require.Error(t, err, "first attempt against failing endpoint must surface the error") + require.Equal(t, int64(1), transport.count.Load()) + + // Next 50 Update calls within the refresh interval return the cached + // error -- they do NOT re-issue the HTTP call. + for i := 0; i < 50; i++ { + err := gem.Update(context.Background(), false) + require.Error(t, err, "throttled Update must still surface the cached error so callers know the GEM is not populated") + } + require.Equal(t, int64(1), transport.count.Load(), + "a failed Update must be throttled exactly like a successful one") +} + +// ---------------------------------------------------------------------------- +// F2: concurrent Update callers are coalesced into a single in-flight HTTP +// call via the single-in-flight pattern in gem.Update. +// ---------------------------------------------------------------------------- +func TestFix2_ConcurrentUpdateCallersCoalesce(t *testing.T) { + // Slow transport so the first refresh is still in flight while the rest + // of the goroutines arrive. + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 100 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + const concurrency = 200 + wg := sync.WaitGroup{} + wg.Add(concurrency) + errs := make([]error, concurrency) + for i := 0; i < concurrency; i++ { + go func(idx int) { + defer wg.Done() + errs[idx] = gem.Update(context.Background(), false) + }(i) + } + wg.Wait() + for _, err := range errs { + require.NoError(t, err) + } + require.Equal(t, int64(1), transport.count.Load(), + "concurrent Update callers must coalesce into a single HTTP call") +} + +// ---------------------------------------------------------------------------- +// F3: write-retry on 403/WriteForbidden invalidates the GEM exactly once +// (when the endpoint is newly marked unavailable), then subsequent retries +// within the unavailability window are throttled. +// ---------------------------------------------------------------------------- +func TestFix3_WriteRetryIssuesAtMostOneGEMCall(t *testing.T) { + defaultEndpoint, err := url.Parse("https://fake.documents.azure.com:443/") + require.NoError(t, err) + + westRegion := accountRegion{Name: "West US", Endpoint: defaultEndpoint.String()} + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{westRegion}, + WriteRegions: []accountRegion{westRegion}, + }) + transport := &countingTransport{status: http.StatusOK, body: body} + gemPipeline := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", + azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + pipeline: gemPipeline, + preferredLocations: []string{"West US"}, + locationCache: CreateMockLC(*defaultEndpoint, false), + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + retry := &clientRetryPolicy{gem: gem} + + req, err := azruntime.NewRequest(context.Background(), http.MethodPost, defaultEndpoint.String()) + require.NoError(t, err) + + const writeRetries = 5 + rc := &retryContext{} + for i := 0; i < writeRetries; i++ { + shouldRetry, err := retry.attemptRetryOnEndpointFailure(req, true, rc) + require.NoError(t, err) + require.True(t, shouldRetry) + rc.retryCount++ + } + require.Equal(t, int64(1), transport.count.Load(), + "write retries within the same unavailability window must collapse to one GEM call") +} + +// ---------------------------------------------------------------------------- +// F1b: an invalidate() that fires while a refresh is in flight must NOT be +// lost. Before the fix, the in-flight leader's "set lastUpdateTime" would +// overwrite the invalidation timestamps, causing the next caller to observe +// the refresh as completed and skip the refresh that was demanded by the +// invalidation. The leader now snapshots invalidationGen and refuses to +// commit timestamps if it changed during the flight. +// ---------------------------------------------------------------------------- +func TestFix1b_InvalidateDuringInflightRefreshIsHonored(t *testing.T) { + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 150 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // Kick off a refresh; while it is in flight, invalidate. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = gem.Update(context.Background(), false) + }() + time.Sleep(30 * time.Millisecond) // ensure the leader has the inflight slot + gem.invalidate() + wg.Wait() + + // First refresh returned (count=1) but invalidation should have + // prevented timestamps from being committed. A subsequent Update must + // therefore issue a fresh HTTP call (count=2). + require.Equal(t, int64(1), transport.count.Load()) + require.NoError(t, gem.Update(context.Background(), false)) + require.Equal(t, int64(2), transport.count.Load(), + "invalidation during an in-flight refresh must force the next Update to actually refresh") +} + +// ---------------------------------------------------------------------------- +// F3c: concurrent MarkEndpointUnavailable* calls for the same endpoint may +// each observe wasUnavailable==false (the check is not atomic with the +// mark). Each one may call invalidate(). The single-in-flight pattern in +// Update bounds the resulting HTTP calls so the user-visible blast radius +// is at most one extra refresh per concurrent burst -- not one per marker. +// This test documents and bounds that behaviour. +// ---------------------------------------------------------------------------- +func TestFix3c_ConcurrentSameEndpointMarksAreBounded(t *testing.T) { + defaultEndpoint, _ := url.Parse("https://fake.documents.azure.com:443/") + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: defaultEndpoint.String()}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: defaultEndpoint.String()}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 50 * time.Millisecond} + gemPipeline := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", + azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + pipeline: gemPipeline, + preferredLocations: []string{"West US"}, + locationCache: CreateMockLC(*defaultEndpoint, false), + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + + const concurrency = 50 + wg := sync.WaitGroup{} + wg.Add(concurrency * 2) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + _ = gem.MarkEndpointUnavailableForWrite(*defaultEndpoint) + }() + go func() { + defer wg.Done() + _ = gem.Update(context.Background(), false) + }() + } + wg.Wait() + // Give any spawned refresh time to drain. + time.Sleep(200 * time.Millisecond) + + // The exact number depends on timing (some MARK calls may race with the + // in-flight refresh and invalidate it once more), but it MUST be bounded + // to a small constant -- not proportional to concurrency. + calls := transport.count.Load() + require.LessOrEqual(t, calls, int64(3), + "concurrent same-endpoint marks must produce a bounded number of GEM calls (got %d for concurrency=%d)", calls, concurrency) +} + +// to every request while the GEM has never been successfully populated, not +// just the very first request. The cached lastUpdateErr is returned from +// throttled Update calls when !populated. +// ---------------------------------------------------------------------------- +func TestFix3b_BootstrapFailureIsSurfacedOnEveryRequestUntilThrottleExpires(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + for i := 0; i < 5; i++ { + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r) + require.Error(t, err, "request %d must surface the cached bootstrap error", i) + } + require.Equal(t, int64(1), transport.count.Load(), + "only one actual HTTP call must be made; the rest return the cached error") +} + +// resettableOnce only latches on success, so the next caller retries the +// bootstrap. Combined with F1, the retries are throttled to one per +// refreshTimeInterval -- they don't fan out per request. +// ---------------------------------------------------------------------------- +func TestFix4_InitialBootstrapFailureIsRetriedAndThrottled(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest, delay: 5 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + // First request: synchronous bootstrap fires the GEM HTTP call, which + // fails. The error must surface to the caller. + r1, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r1.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r1) + require.Error(t, err, "the failed bootstrap error must surface to the caller") + + first := transport.count.Load() + require.Equal(t, int64(1), first) + + // Next 20 sequential requests must NOT each issue a fresh + // GetDatabaseAccount call. Bootstrap is retried on the next request but + // throttled by lastAttemptTime; subsequent requests inside the throttle + // window see ShouldRefresh()==false and skip the async refresh too. + const followUp = 20 + for i := 0; i < followUp; i++ { + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, _ = pl.Do(r) + } + // Drain any goroutines that the policy may have spawned. + time.Sleep(200 * time.Millisecond) + + total := transport.count.Load() + require.Equal(t, int64(1), total, + "failed bootstrap must be throttled, not retried on every request (got %d HTTP calls)", total) +} + +// ---------------------------------------------------------------------------- +// F5: locationCache.readEndpoints / writeEndpoints no longer self-deadlock on +// the stale+unavailable refresh path. Before the fix, this hung forever +// because RLock could not be upgraded to Lock inside refreshStaleEndpoints. +// ---------------------------------------------------------------------------- +func TestFix5_ReadEndpointsDoesNotDeadlock(t *testing.T) { + defaultEndpoint, _ := url.Parse("https://fake.documents.azure.com:443/") + lc := CreateMockLC(*defaultEndpoint, false) + lc.locationUnavailabilityInfoMap[*defaultEndpoint] = locationUnavailabilityInfo{ + lastCheckTime: time.Now(), + unavailableOps: read, + } + lc.lastUpdateTime = time.Now().Add(-10 * time.Minute) + + done := make(chan error, 1) + go func() { + _, err := lc.readEndpoints() + done <- err + }() + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("readEndpoints deadlocked on the stale+unavailable path -- F5 regression") + } + + done2 := make(chan error, 1) + go func() { + _, err := lc.writeEndpoints() + done2 <- err + }() + select { + case err := <-done2: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("writeEndpoints deadlocked on the stale+unavailable path -- F5 regression") + } +} + +// ---------------------------------------------------------------------------- +// Soak test: a high-concurrency burst against a healthy GEM with the default +// 5-min refresh interval should issue exactly one GetDatabaseAccount call. +// This is the headline regression guard for issue #25468. +// ---------------------------------------------------------------------------- +func TestRegression25468_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 20 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + const concurrency = 500 + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, _ = pl.Do(r) + }() + } + wg.Wait() + time.Sleep(300 * time.Millisecond) + + calls := transport.count.Load() + require.Equal(t, int64(1), calls, + "500 concurrent requests on a healthy client must produce exactly one GetDatabaseAccount call (got %d)", calls) +} + +// Soak test variant: the same burst against a FAILING GEM must also produce +// exactly one HTTP call, demonstrating that F1 + F2 together close the +// failure-storm path. +func TestRegression25468_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest, delay: 20 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + const concurrency = 500 + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, _ = pl.Do(r) + }() + } + wg.Wait() + time.Sleep(300 * time.Millisecond) + + calls := transport.count.Load() + require.Equal(t, int64(1), calls, + "500 concurrent requests against a failing GEM must still produce only one GetDatabaseAccount call (got %d)", calls) +} + +// ---------------------------------------------------------------------------- +// Default-endpoint elimination (issue #25468 followup). +// After the GEM is populated, data-plane requests must never resolve to the +// customer-supplied endpoint -- with the single exception of the degenerate +// "zero write regions" case for write requests. +// ---------------------------------------------------------------------------- + +// makeGEMWithRegions builds a populated GEM whose default endpoint host +// differs from every account-region host, so a routing fallback to the +// default endpoint is observable. +func makeGEMWithRegions(t *testing.T, isMultiMaster bool, preferred []string, enableCrossRegion bool) *globalEndpointManager { + t.Helper() + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: enableCrossRegion, + enableMultipleWriteLocations: isMultiMaster, + } + writeRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + } + if isMultiMaster { + writeRegions = append(writeRegions, accountRegion{Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}) + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + {Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, + {Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}, + } + require.NoError(t, lc.update(writeRegions, readRegions, preferred, &isMultiMaster)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + preferredLocations: preferred, + locationCache: lc, + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + return gem +} + +func TestDefaultEndpointElim_DataPlaneNeverHitsDefaultWhenPopulated(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + + cases := []struct { + name string + multi bool + preferred []string + }{ + {"singleMaster_withPreferred", false, []string{"West US", "East US"}}, + {"singleMaster_noPreferred", false, []string{}}, + {"multiMaster_withPreferred", true, []string{"Central US", "East US"}}, + {"multiMaster_noPreferred", true, []string{}}, + } + resourceTypes := []resourceType{resourceTypeDocument, resourceTypeCollection, resourceTypeDatabase} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gem := makeGEMWithRegions(t, tc.multi, tc.preferred, true) + for _, rt := range resourceTypes { + for _, isWrite := range []bool{false, true} { + for _, useWrite := range []bool{false, true} { + for idx := 0; idx < 25; idx++ { + ep := gem.ResolveServiceEndpoint(idx, rt, isWrite, useWrite) + require.NotEqual(t, defaultHost, ep.Host, + "resourceType=%v isWrite=%v useWrite=%v idx=%d resolved to default endpoint", rt, isWrite, useWrite, idx) + } + } + } + } + }) + } +} + +func TestDefaultEndpointElim_SessionRetryOnSingleMasterRoutesToRegionalWrite(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + gem := makeGEMWithRegions(t, false /*single master*/, []string{"West US"}, true) + + // Simulate the session-not-available retry: useWriteEndpoint=true on a + // single-master read. The resolved endpoint must be the regional write + // region, not the customer endpoint. + ep := gem.ResolveServiceEndpoint(1, resourceTypeDocument, false /*isWrite*/, true /*useWrite*/) + require.NotEqual(t, defaultHost, ep.Host) + require.Contains(t, ep.Host, "east-us") +} + +func TestDefaultEndpointElim_UnmatchedPreferredRegionRoutesToRegional(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + // Customer set preferred regions that the account does NOT advertise. + gem := makeGEMWithRegions(t, false, []string{"South Africa North", "France Central"}, true) + + for idx := 0; idx < 10; idx++ { + readEP := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false, false) + require.NotEqual(t, defaultHost, readEP.Host, + "read with unmatched preferred region resolved to default endpoint at idx=%d", idx) + writeEP := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, true, false) + require.NotEqual(t, defaultHost, writeEP.Host, + "write with unmatched preferred region resolved to default endpoint at idx=%d", idx) + } +} + +func TestDefaultEndpointElim_ZeroWriteRegionsRetainsDefaultFallback(t *testing.T) { + // Degenerate account metadata: no write regions advertised. Per the + // project decision, this is the ONLY remaining data-plane code path + // where a write request routes to the customer-supplied endpoint. + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: true, + enableMultipleWriteLocations: false, + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + } + noWrites := []accountRegion{} // explicit zero + multi := false + require.NoError(t, lc.update(noWrites, readRegions, []string{}, &multi)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), locationCache: lc, + refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), + } + + ep := gem.ResolveServiceEndpoint(0, resourceTypeDocument, true /*isWrite*/, false) + require.Equal(t, defaultEndpoint.Host, ep.Host, + "zero-write-regions write must still route to the customer endpoint (documented degenerate case)") +} + +func TestDefaultEndpointElim_CrossRegionRetriesDisabledStillRoutesRegional(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + gem := makeGEMWithRegions(t, false, []string{"East US"}, false /*enableCrossRegion=false*/) + + // With cross-region retries disabled, the primary endpoint must still + // be regional; retries must pin to the same region (locationIndex=0 + // regardless of the caller's retry count). + for idx := 0; idx < 10; idx++ { + ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, true, false) + require.NotEqual(t, defaultHost, ep.Host, + "cross-region-retries=false must not fall back to default endpoint at idx=%d", idx) + require.Contains(t, ep.Host, "east-us") + } +} + +// TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion verifies the +// corollary of TestDefaultEndpointElim_ZeroWriteRegionsRetainsDefaultFallback: +// when the account advertises zero write regions, READS must still resolve to +// an advertised read region rather than the customer endpoint. Only the +// write request in this scenario is allowed to fall through to default. +func TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion(t *testing.T) { +defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") +lc := &locationCache{ +defaultEndpoint: *defaultEndpoint, +locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, +unavailableLocationExpirationTime: defaultExpirationTime, +enableCrossRegionRetries: true, +enableMultipleWriteLocations: false, +} +readRegions := []accountRegion{ +{Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, +{Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, +} +multi := false +require.NoError(t, lc.update([]accountRegion{}, readRegions, []string{}, &multi)) +gem := &globalEndpointManager{ +clientEndpoint: defaultEndpoint.String(), locationCache: lc, +refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), +} + +for idx := 0; idx < 10; idx++ { +ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false /*isWrite*/, false) +require.NotEqual(t, defaultEndpoint.Host, ep.Host, +"reads must route to a read region even when there are zero write regions; got default at idx=%d", idx) +} +} diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index e915353cf260..709265771ad7 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -24,8 +24,36 @@ type globalEndpointManager struct { preferredLocations []string locationCache *locationCache refreshTimeInterval time.Duration - gemMutex sync.RWMutex + gemMutex sync.Mutex lastUpdateTime time.Time + // lastAttemptTime records the most recent Update attempt regardless of + // outcome. shouldRefresh() honours it so a failed GetAccountProperties is + // throttled to refreshTimeInterval just like a successful one. This + // prevents the failure-loop described in + // https://github.com/Azure/azure-sdk-for-go/issues/25468 where every + // caller after a failed Update would re-issue the HTTP call immediately. + lastAttemptTime time.Time + // lastUpdateErr is the error from the most recent refresh attempt. It is + // returned to callers that hit the throttle window before the GEM has + // ever been successfully populated, so a chronic bootstrap failure is + // surfaced on every request rather than silently swallowed. + lastUpdateErr error + // inflight coalesces concurrent Update callers: only the first does the + // HTTP call; the rest wait on the per-flight done channel and read + // per-flight err. Each refresh has its own *updateFlight so late waiters + // cannot accidentally read a subsequent flight's error. + inflight *updateFlight + // invalidationGen is bumped by invalidate(). The Update leader snapshots + // it before the HTTP call and, if it changed by completion, declines to + // advance lastUpdateTime/lastAttemptTime -- so an invalidation that + // happens during an in-flight refresh is not lost. + invalidationGen uint64 +} + +// updateFlight tracks a single in-flight refresh. +type updateFlight struct { + done chan struct{} + err error } func newGlobalEndpointManager(clientEndpoint string, pipeline azruntime.Pipeline, preferredLocations []string, refreshTimeInterval time.Duration, enableCrossRegionRetries bool) (*globalEndpointManager, error) { @@ -59,11 +87,45 @@ func (gem *globalEndpointManager) GetReadEndpoints() ([]url.URL, error) { } func (gem *globalEndpointManager) MarkEndpointUnavailableForWrite(endpoint url.URL) error { - return gem.locationCache.markEndpointUnavailableForWrite(endpoint) + // Snapshot the unavailability state before marking so we can decide + // whether this is a "newly unavailable" event. A new event invalidates + // the GEM cache so the next Update(false) actually fires -- the first + // 403/WriteForbidden or network error for an endpoint may indicate a + // failover and we want to learn about new write regions promptly. + // Subsequent retries within the unavailability window do not invalidate. + wasUnavailable := gem.locationCache.isEndpointUnavailable(endpoint, write) + if err := gem.locationCache.markEndpointUnavailableForWrite(endpoint); err != nil { + return err + } + if !wasUnavailable { + gem.invalidate() + } + return nil } func (gem *globalEndpointManager) MarkEndpointUnavailableForRead(endpoint url.URL) error { - return gem.locationCache.markEndpointUnavailableForRead(endpoint) + wasUnavailable := gem.locationCache.isEndpointUnavailable(endpoint, read) + if err := gem.locationCache.markEndpointUnavailableForRead(endpoint); err != nil { + return err + } + if !wasUnavailable { + gem.invalidate() + } + return nil +} + +// invalidate forces the next non-force Update to actually issue a refresh by +// clearing both lastUpdateTime and lastAttemptTime, and bumps +// invalidationGen so that a refresh currently in flight cannot mask the +// invalidation by writing the post-call timestamps. Used when we learn about +// a newly-unavailable endpoint and want to discover potential failover +// targets without waiting for the next refresh interval. +func (gem *globalEndpointManager) invalidate() { + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() + gem.lastUpdateTime = time.Time{} + gem.lastAttemptTime = time.Time{} + gem.invalidationGen++ } func (gem *globalEndpointManager) GetEndpointLocation(endpoint url.URL) string { @@ -82,39 +144,116 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() { gem.locationCache.refreshStaleEndpoints() } +// populated reports whether the GEM has been successfully refreshed at least +// once. Used by the policy's bootstrap path to decide whether the first +// request on a new client should synchronously wait for GetDatabaseAccount. +// Repeated calls after the first success are cheap (one mutex acquisition). +func (gem *globalEndpointManager) populated() bool { + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() + return !gem.lastUpdateTime.IsZero() +} + func (gem *globalEndpointManager) ShouldRefresh() bool { - gem.gemMutex.RLock() - defer gem.gemMutex.RUnlock() + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() return gem.shouldRefresh() } func (gem *globalEndpointManager) shouldRefresh() bool { - return time.Since(gem.lastUpdateTime) > gem.refreshTimeInterval + // Honor whichever happened more recently: a successful update or an + // attempt that failed. Failures must be throttled too, otherwise a + // failing endpoint causes every caller to re-issue GetDatabaseAccount + // immediately (issue #25468). + last := gem.lastUpdateTime + if gem.lastAttemptTime.After(last) { + last = gem.lastAttemptTime + } + return time.Since(last) > gem.refreshTimeInterval } func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, resourceType resourceType, isWriteOperation, useWriteEndpoint bool) url.URL { return gem.locationCache.resolveServiceEndpoint(locationIndex, resourceType, isWriteOperation, useWriteEndpoint) } +// Update refreshes the GEM cache by calling GetAccountProperties when needed. +// Concurrent callers are coalesced via a single-in-flight pattern so at most +// one HTTP call is in flight per client at any time. Both successes and +// failures advance lastAttemptTime so the next refresh is throttled to +// refreshTimeInterval -- this prevents the failure-storm described in +// https://github.com/Azure/azure-sdk-for-go/issues/25468. +// +// If the GEM has never been successfully populated and the throttle is +// active, Update returns the cached error from the most recent failed +// attempt so a chronic bootstrap failure is surfaced on every request +// rather than silently swallowed. func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) error { gem.gemMutex.Lock() - defer gem.gemMutex.Unlock() if !gem.shouldRefresh() && !forceRefresh { - return nil + // Throttled. Surface the cached error if we have never succeeded. + var cached error + if gem.lastUpdateTime.IsZero() { + cached = gem.lastUpdateErr + } + gem.gemMutex.Unlock() + return cached } + if gem.inflight != nil { + // Another goroutine is performing a refresh. Wait for it and share + // its result rather than spawning a duplicate HTTP call. The result + // lives on the per-flight struct so subsequent flights cannot + // overwrite it. + flight := gem.inflight + gem.gemMutex.Unlock() + <-flight.done + return flight.err + } + // We are the leader. Publish the inflight flight and snapshot the + // invalidation generation, then release the lock while we perform the + // HTTP call so ShouldRefresh and other non-Update paths don't block on + // a network round-trip. + flight := &updateFlight{done: make(chan struct{})} + gem.inflight = flight + genAtStart := gem.invalidationGen + gem.gemMutex.Unlock() + + err := gem.refreshOnce(ctx) + + gem.gemMutex.Lock() + flight.err = err + gem.lastUpdateErr = err + if gem.invalidationGen == genAtStart { + // No invalidation occurred during the flight, so commit the + // timestamps and let the throttle take effect. + gem.lastAttemptTime = time.Now() + if err == nil { + gem.lastUpdateTime = gem.lastAttemptTime + } + } + // If invalidationGen changed, leave the timestamps untouched so the + // next caller observes shouldRefresh()==true and performs a fresh + // refresh that reflects the post-invalidation state. + gem.inflight = nil + gem.gemMutex.Unlock() + close(flight.done) + return err +} + +// refreshOnce performs the actual GetAccountProperties HTTP call and +// propagates the result into the location cache. It must not hold gemMutex. +func (gem *globalEndpointManager) refreshOnce(ctx context.Context) error { accountProperties, err := gem.GetAccountProperties(ctx) if err != nil { return fmt.Errorf("failed to retrieve account properties: %v", err) } - err = gem.locationCache.update( + if err := gem.locationCache.update( accountProperties.WriteRegions, accountProperties.ReadRegions, gem.preferredLocations, - &accountProperties.EnableMultipleWriteLocations) - if err != nil { + &accountProperties.EnableMultipleWriteLocations, + ); err != nil { return fmt.Errorf("failed to update location cache: %v", err) } - gem.lastUpdateTime = time.Now() return nil } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go index 357a2240fdb3..eba333f8791c 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go @@ -6,27 +6,35 @@ package azcosmos import ( "context" "net/http" - "sync" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" ) type globalEndpointManagerPolicy struct { - gem *globalEndpointManager - once sync.Once + gem *globalEndpointManager } func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, error) { + // Synchronous bootstrap: while the GEM has never been successfully + // populated, every request synchronously calls Update. Concurrent + // callers are coalesced inside gem.Update via the single-in-flight + // pattern, so at most one HTTP call is in flight. If the call fails + // the throttle in gem.Update (lastAttemptTime) ensures subsequent + // bootstrap retries respect refreshTimeInterval -- preventing the + // failure storm described in + // https://github.com/Azure/azure-sdk-for-go/issues/25468. var err error - p.once.Do(func() { + if !p.gem.populated() { // Use the same context, but without the cancellation signal. - // We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request. - err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), true) - }) + // We DO want to preserve things like context values, but the GEM + // update needs to complete fully, even if the user cancels the + // triggering request. + err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) + } if p.gem.ShouldRefresh() { go func() { - // Use the same context, but without the cancellation signal. - // We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request. + // Concurrent goroutines spawned here are coalesced inside + // gem.Update via the single-in-flight pattern. _ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) }() } diff --git a/sdk/data/azcosmos/cosmos_location_cache.go b/sdk/data/azcosmos/cosmos_location_cache.go index 51f07e51a280..a6c1a8f5a5c0 100644 --- a/sdk/data/azcosmos/cosmos_location_cache.go +++ b/sdk/data/azcosmos/cosmos_location_cache.go @@ -78,7 +78,17 @@ func newLocationCache(prefLocations []string, defaultEndpoint url.URL, enableCro } } +// update refreshes the location cache. It acquires mapMutex internally; do +// not call it while holding mapMutex (use updateLocked from inside such +// sections). Public callers go through update; internal callers that already +// hold the write lock call updateLocked. func (lc *locationCache) update(writeLocations []accountRegion, readLocations []accountRegion, prefList []string, enableMultipleWriteLocations *bool) error { + lc.mapMutex.Lock() + defer lc.mapMutex.Unlock() + return lc.updateLocked(writeLocations, readLocations, prefList, enableMultipleWriteLocations) +} + +func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocations []accountRegion, prefList []string, enableMultipleWriteLocations *bool) error { nextLoc := copyDatabaseAccountLocationsInfo(lc.locationInfo) if prefList != nil { nextLoc.prefLocations = prefList @@ -86,7 +96,7 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations [] if enableMultipleWriteLocations != nil { lc.enableMultipleWriteLocations = *enableMultipleWriteLocations } - lc.refreshStaleEndpoints() + lc.refreshStaleEndpointsLocked() if readLocations != nil { availReadEndpointsByLocation, availReadLocations, err := getEndpointsByLocation(readLocations) if err != nil { @@ -105,8 +115,30 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations [] nextLoc.availWriteLocations = availWriteLocations } - nextLoc.writeEndpoints = lc.getPrefAvailableEndpoints(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, write, lc.defaultEndpoint) - nextLoc.readEndpoints = lc.getPrefAvailableEndpoints(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, read, nextLoc.writeEndpoints[0]) + // Choose regional fallbacks so the route lists never trail into the + // customer-supplied default endpoint. See issue #25468 / followup + // "no data-plane traffic to default endpoint": the only data-plane code + // path that may still hit the default endpoint is the degenerate "zero + // write regions on a write" case. + writeFallback := lc.defaultEndpoint + if len(nextLoc.availWriteLocations) > 0 { + if ep, ok := nextLoc.availWriteEndpointsByLocation[nextLoc.availWriteLocations[0]]; ok { + writeFallback = ep + } + } + nextLoc.writeEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, write, writeFallback) + // Prefer the first available read region for the read fallback. Only + // fall back to the first write endpoint (or, transitively, the default + // endpoint) when the account advertises zero read regions -- accounts + // with valid read regions must never resolve a read to the default + // endpoint. + readFallback := nextLoc.writeEndpoints[0] + if len(nextLoc.availReadLocations) > 0 { + if ep, ok := nextLoc.availReadEndpointsByLocation[nextLoc.availReadLocations[0]]; ok { + readFallback = ep + } + } + nextLoc.readEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, read, readFallback) lc.lastUpdateTime = time.Now() lc.locationInfo = nextLoc // TODO: log @@ -115,11 +147,23 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations [] func (lc *locationCache) resolveServiceEndpoint(locationIndex int, resourceType resourceType, isWriteOperation, useWriteEndpoint bool) url.URL { if (isWriteOperation || useWriteEndpoint) && !lc.canUseMultipleWriteLocsToRoute(resourceType) { - if lc.enableCrossRegionRetries && len(lc.locationInfo.availWriteLocations) > 0 { - locationIndex = min(locationIndex%2, len(lc.locationInfo.availWriteLocations)-1) + if len(lc.locationInfo.availWriteLocations) > 0 { + // Prefer a regional endpoint. The cross-region-retries flag only + // gates whether we walk across regions on retry; it must not + // cause the primary endpoint to fall back to the default + // endpoint when regional metadata is available + // (issue #25468 followup: no data-plane traffic to default). + if lc.enableCrossRegionRetries { + locationIndex = min(locationIndex%2, len(lc.locationInfo.availWriteLocations)-1) + } else { + locationIndex = 0 + } writeLocation := lc.locationInfo.availWriteLocations[locationIndex] return lc.locationInfo.availWriteEndpointsByLocation[writeLocation] } + // Degenerate case: account metadata advertises zero write regions. + // Per the project decision this is the ONLY data-plane code path + // where we still fall back to the customer-supplied endpoint. return lc.defaultEndpoint } @@ -134,27 +178,38 @@ func (lc *locationCache) canUseMultipleWriteLocsToRoute(resourceType resourceTyp return lc.canUseMultipleWriteLocs() && resourceType == resourceTypeDocument } +// readEndpoints returns the cached preferred read endpoints, refreshing the +// stale-endpoint set if the cache hasn't been updated within the +// unavailableLocationExpirationTime AND at least one unavailability entry is +// recorded. The refresh path used to call lc.update while still holding +// mapMutex.RLock, which deadlocks with refreshStaleEndpoints's Lock() +// (sync.RWMutex cannot upgrade RLock to Lock). We now capture the staleness +// decision under RLock, release it, and let update acquire the write lock. func (lc *locationCache) readEndpoints() ([]url.URL, error) { lc.mapMutex.RLock() - defer lc.mapMutex.RUnlock() - if time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 { - err := lc.update(nil, nil, nil, nil) - if err != nil { + stale := time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 + lc.mapMutex.RUnlock() + if stale { + if err := lc.update(nil, nil, nil, nil); err != nil { return nil, err } } + lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() return lc.locationInfo.readEndpoints, nil } func (lc *locationCache) writeEndpoints() ([]url.URL, error) { lc.mapMutex.RLock() - defer lc.mapMutex.RUnlock() - if time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 { - err := lc.update(nil, nil, nil, nil) - if err != nil { + stale := time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 + lc.mapMutex.RUnlock() + if stale { + if err := lc.update(nil, nil, nil, nil); err != nil { return nil, err } } + lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() return lc.locationInfo.writeEndpoints, nil } @@ -198,20 +253,18 @@ func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) error func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) error { now := time.Now() lc.mapMutex.Lock() + defer lc.mapMutex.Unlock() if info, ok := lc.locationUnavailabilityInfoMap[endpoint]; ok { info.lastCheckTime = now info.unavailableOps |= op lc.locationUnavailabilityInfoMap[endpoint] = info } else { - info = locationUnavailabilityInfo{ + lc.locationUnavailabilityInfoMap[endpoint] = locationUnavailabilityInfo{ lastCheckTime: now, unavailableOps: op, } - lc.locationUnavailabilityInfoMap[endpoint] = info } - lc.mapMutex.Unlock() - err := lc.update(nil, nil, nil, nil) - return err + return lc.updateLocked(nil, nil, nil, nil) } func (lc *locationCache) databaseAccountRead(dbAcct accountProperties) error { @@ -221,9 +274,12 @@ func (lc *locationCache) databaseAccountRead(dbAcct accountProperties) error { func (lc *locationCache) refreshStaleEndpoints() { lc.mapMutex.Lock() defer lc.mapMutex.Unlock() + lc.refreshStaleEndpointsLocked() +} + +func (lc *locationCache) refreshStaleEndpointsLocked() { for endpoint, info := range lc.locationUnavailabilityInfoMap { - t := time.Since(info.lastCheckTime) - if t > lc.unavailableLocationExpirationTime { + if time.Since(info.lastCheckTime) > lc.unavailableLocationExpirationTime { delete(lc.locationUnavailabilityInfoMap, endpoint) } } @@ -231,23 +287,26 @@ func (lc *locationCache) refreshStaleEndpoints() { func (lc *locationCache) isEndpointUnavailable(endpoint url.URL, ops requestedOperations) bool { lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() + return lc.isEndpointUnavailableLocked(endpoint, ops) +} + +func (lc *locationCache) isEndpointUnavailableLocked(endpoint url.URL, ops requestedOperations) bool { info, ok := lc.locationUnavailabilityInfoMap[endpoint] - lc.mapMutex.RUnlock() if ops == none || !ok || ops&info.unavailableOps != ops { return false } return time.Since(info.lastCheckTime) < lc.unavailableLocationExpirationTime } -func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url.URL, locs []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { +func (lc *locationCache) getPrefAvailableEndpointsLocked(endpointsByLoc map[string]url.URL, locs []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { endpoints := make([]url.URL, 0) if lc.enableCrossRegionRetries { if lc.canUseMultipleWriteLocs() || availOps&read != 0 { unavailEndpoints := make([]url.URL, 0) - unavailEndpoints = append(unavailEndpoints, fallbackEndpoint) for _, loc := range lc.locationInfo.prefLocations { - if endpoint, ok := endpointsByLoc[loc]; ok && endpoint != fallbackEndpoint { - if lc.isEndpointUnavailable(endpoint, availOps) { + if endpoint, ok := endpointsByLoc[loc]; ok { + if lc.isEndpointUnavailableLocked(endpoint, availOps) { unavailEndpoints = append(unavailEndpoints, endpoint) } else { endpoints = append(endpoints, endpoint) @@ -264,6 +323,13 @@ func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url } } if len(endpoints) == 0 { + // Last resort: none of the customer's preferred regions matched the + // account's regions (or cross-region retries are off and the + // non-multi-master read branch yielded nothing, or the account + // itself advertises zero regions). The caller passes a regional + // fallback whenever availWriteLocations is non-empty, so the only + // time this is the customer-supplied default endpoint is the + // degenerate "zero regions" case approved for issue #25468 followup. endpoints = append(endpoints, fallbackEndpoint) } return endpoints @@ -291,6 +357,13 @@ func newDatabaseAccountLocationsInfo(prefLocations []string, defaultEndpoint url availReadLocs := make([]string, 0) availWriteEndpointsByLocation := make(map[string]url.URL) availReadEndpointsByLocation := make(map[string]url.URL) + // Pre-populated seed: the lists contain defaultEndpoint until the first + // successful Update() replaces them with regional endpoints. This is + // safe because the pipeline policy (globalEndpointManagerPolicy) blocks + // data-plane requests on a synchronous bootstrap and surfaces the GEM + // error if it fails -- so resolveServiceEndpoint is never consulted for + // a real data-plane request while these seeded values are still in + // effect. See issue #25468 followup. writeEndpoints := []url.URL{defaultEndpoint} readEndpoints := []url.URL{defaultEndpoint} return &databaseAccountLocationsInfo{ diff --git a/sdk/data/azcosmos/cosmos_location_cache_test.go b/sdk/data/azcosmos/cosmos_location_cache_test.go index b7b2bcb82ff6..d8588b38b42b 100644 --- a/sdk/data/azcosmos/cosmos_location_cache_test.go +++ b/sdk/data/azcosmos/cosmos_location_cache_test.go @@ -260,9 +260,12 @@ func TestGetPrefAvailableEndpoints(t *testing.T) { } // loc1: unavailable, loc2: available, loc5: non-existent lc.locationInfo.prefLocations = []string{loc1.Name, loc2.Name, "location5"} - prefWriteEndpoints := lc.getPrefAvailableEndpoints(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, write, lc.defaultEndpoint) - // loc2: preferred + available, default: fallback endpoint, loc1: unavailable + preferred - expectedWriteEndpoints := []*url.URL{loc2Endpoint, defaultEndpoint, loc1Endpoint} + prefWriteEndpoints := lc.getPrefAvailableEndpointsLocked(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, write, lc.defaultEndpoint) + // loc2: preferred + available; loc1: unavailable + preferred (moved to + // tail). The trailing default-endpoint fallback was removed when we + // stopped routing data-plane traffic to the customer-supplied endpoint + // (issue #25468 followup). + expectedWriteEndpoints := []*url.URL{loc2Endpoint, loc1Endpoint} for i, endpoint := range expectedWriteEndpoints { if endpoint.String() != prefWriteEndpoints[i].String() { @@ -281,7 +284,11 @@ func TestReadEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - expectedReadEndpoints := []*url.URL{loc2Endpoint, loc4Endpoint, loc1Endpoint} + // Before issue #25468 followup the code skipped loc1 in the main pref + // loop (because it equalled the write fallback), pushing it to the tail. + // Now loc1 is included in preferred order, giving a more intuitive + // result. + expectedReadEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc4Endpoint} actualReadEndpoints, err := lc.readEndpoints() if err != nil { t.Fatalf("Received error getting read endpoints: %s", err.Error()) @@ -301,7 +308,7 @@ func TestReadEndpoints(t *testing.T) { if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - expectedReadEndpoints = []*url.URL{loc4Endpoint, loc1Endpoint, loc2Endpoint} + expectedReadEndpoints = []*url.URL{loc1Endpoint, loc4Endpoint, loc2Endpoint} actualReadEndpoints, err = lc.readEndpoints() if err != nil { t.Fatalf("Received error getting read endpoints: %s", err.Error()) @@ -329,7 +336,9 @@ func TestWriteEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - expectedWriteEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc3Endpoint, defaultEndpoint} + // Trailing default-endpoint fallback was removed for issue #25468 + // followup: route lists must contain only regional endpoints. + expectedWriteEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc3Endpoint} actualWriteEndpoints, err := lc.writeEndpoints() if err != nil { t.Fatalf("Received error getting write endpoints: %s", err.Error()) @@ -349,7 +358,7 @@ func TestWriteEndpoints(t *testing.T) { if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - expectedWriteEndpoints = []*url.URL{loc2Endpoint, loc3Endpoint, defaultEndpoint, loc1Endpoint} + expectedWriteEndpoints = []*url.URL{loc2Endpoint, loc3Endpoint, loc1Endpoint} actualWriteEndpoints, err = lc.writeEndpoints() if err != nil { t.Fatalf("Received error getting write endpoints: %s", err.Error()) From 3c84ef1563f45a74b77cf61e55ee7c5bfd3b19f2 Mon Sep 17 00:00:00 2001 From: Tomas Varon Date: Tue, 19 May 2026 19:58:42 -0700 Subject: [PATCH 2/8] azcosmos: address deep-review findings (panic-safety, atomic mark, test hardening) Follow-up to PR #26815 / issue #25468 addressing findings from the deep-review pass: 1. Panic-safe gem.Update: wrap refreshOnce in a deferred cleanup so a panic in the HTTP pipeline (or any code refreshOnce transitively calls) cannot leak gem.inflight / leave flight.done unclosed -- which would have permanently wedged every subsequent Update caller. The panic is re-raised after cleanup. 2. Atomic check-and-mark in locationCache.markEndpointUnavailable: the helper now returns wasAlreadyUnavailable from inside the same mapMutex critical section that performs the mark. This eliminates the check-then-act race between MarkEndpointUnavailableFor* and isEndpointUnavailable that previously let multiple concurrent markers each call invalidate(). Tightens TestFix3c bound from <=3 to a provable <=2. 3. TestFix4 hardening: assert require.Error on each follow-up request, so the test cannot pass if a regression made populated() return true and silently skip the bootstrap. All tests pass with -race; gofmt and vet clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azcosmos/cosmos_dbaccount_refresh_test.go | 66 ++++++++------- .../cosmos_global_endpoint_manager.go | 82 ++++++++++++------- sdk/data/azcosmos/cosmos_location_cache.go | 15 +++- .../azcosmos/cosmos_location_cache_test.go | 16 ++-- 4 files changed, 107 insertions(+), 72 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index 944496c6ab45..d46d16b63f4f 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -267,12 +267,14 @@ func TestFix3c_ConcurrentSameEndpointMarksAreBounded(t *testing.T) { // Give any spawned refresh time to drain. time.Sleep(200 * time.Millisecond) - // The exact number depends on timing (some MARK calls may race with the - // in-flight refresh and invalidate it once more), but it MUST be bounded - // to a small constant -- not proportional to concurrency. + // With the atomic check-and-mark in markEndpointUnavailable, only the + // FIRST goroutine to win the mapMutex Lock observes wasAlreadyUnavailable + // == false and triggers invalidate(). All other markers see true and + // skip the invalidation. The leader's refresh + at most one + // post-invalidation refresh therefore bounds the total to 2. calls := transport.count.Load() - require.LessOrEqual(t, calls, int64(3), - "concurrent same-endpoint marks must produce a bounded number of GEM calls (got %d for concurrency=%d)", calls, concurrency) + require.LessOrEqual(t, calls, int64(2), + "concurrent same-endpoint marks must produce a tightly-bounded number of GEM calls (got %d for concurrency=%d)", calls, concurrency) } // to every request while the GEM has never been successfully populated, not @@ -327,11 +329,17 @@ func TestFix4_InitialBootstrapFailureIsRetriedAndThrottled(t *testing.T) { // GetDatabaseAccount call. Bootstrap is retried on the next request but // throttled by lastAttemptTime; subsequent requests inside the throttle // window see ShouldRefresh()==false and skip the async refresh too. + // Capture the error from each follow-up to confirm the bootstrap path + // is actually entered (returning the cached error) -- this prevents a + // regression where populated() accidentally returned true and the + // bootstrap was silently skipped. const followUp = 20 for i := 0; i < followUp; i++ { r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) - _, _ = pl.Do(r) + _, err := pl.Do(r) + require.Error(t, err, + "follow-up request %d must surface the cached bootstrap error -- if it doesn't, populated() is incorrectly returning true and we're silently routing to an unpopulated GEM", i) } // Drain any goroutines that the policy may have spawned. time.Sleep(200 * time.Millisecond) @@ -601,28 +609,28 @@ func TestDefaultEndpointElim_CrossRegionRetriesDisabledStillRoutesRegional(t *te // an advertised read region rather than the customer endpoint. Only the // write request in this scenario is allowed to fall through to default. func TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion(t *testing.T) { -defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") -lc := &locationCache{ -defaultEndpoint: *defaultEndpoint, -locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, -unavailableLocationExpirationTime: defaultExpirationTime, -enableCrossRegionRetries: true, -enableMultipleWriteLocations: false, -} -readRegions := []accountRegion{ -{Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, -{Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, -} -multi := false -require.NoError(t, lc.update([]accountRegion{}, readRegions, []string{}, &multi)) -gem := &globalEndpointManager{ -clientEndpoint: defaultEndpoint.String(), locationCache: lc, -refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), -} + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: true, + enableMultipleWriteLocations: false, + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + {Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, + } + multi := false + require.NoError(t, lc.update([]accountRegion{}, readRegions, []string{}, &multi)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), locationCache: lc, + refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), + } -for idx := 0; idx < 10; idx++ { -ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false /*isWrite*/, false) -require.NotEqual(t, defaultEndpoint.Host, ep.Host, -"reads must route to a read region even when there are zero write regions; got default at idx=%d", idx) -} + for idx := 0; idx < 10; idx++ { + ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false /*isWrite*/, false) + require.NotEqual(t, defaultEndpoint.Host, ep.Host, + "reads must route to a read region even when there are zero write regions; got default at idx=%d", idx) + } } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index 709265771ad7..cbe2a8bccc81 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -87,28 +87,32 @@ func (gem *globalEndpointManager) GetReadEndpoints() ([]url.URL, error) { } func (gem *globalEndpointManager) MarkEndpointUnavailableForWrite(endpoint url.URL) error { - // Snapshot the unavailability state before marking so we can decide - // whether this is a "newly unavailable" event. A new event invalidates - // the GEM cache so the next Update(false) actually fires -- the first - // 403/WriteForbidden or network error for an endpoint may indicate a - // failover and we want to learn about new write regions promptly. - // Subsequent retries within the unavailability window do not invalidate. - wasUnavailable := gem.locationCache.isEndpointUnavailable(endpoint, write) - if err := gem.locationCache.markEndpointUnavailableForWrite(endpoint); err != nil { + // markEndpointUnavailableForWrite atomically reports whether the + // endpoint was already unavailable for write in the same critical + // section that performs the mark. This eliminates the check-then-act + // race that would otherwise let concurrent markers all observe + // "wasn't unavailable" and each invalidate the GEM. A new event + // invalidates the GEM cache so the next Update(false) actually fires + // -- the first 403/WriteForbidden or network error for an endpoint + // may indicate a failover and we want to learn about new write + // regions promptly. Subsequent retries within the unavailability + // window do not invalidate. + wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForWrite(endpoint) + if err != nil { return err } - if !wasUnavailable { + if !wasAlreadyUnavailable { gem.invalidate() } return nil } func (gem *globalEndpointManager) MarkEndpointUnavailableForRead(endpoint url.URL) error { - wasUnavailable := gem.locationCache.isEndpointUnavailable(endpoint, read) - if err := gem.locationCache.markEndpointUnavailableForRead(endpoint); err != nil { + wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForRead(endpoint) + if err != nil { return err } - if !wasUnavailable { + if !wasAlreadyUnavailable { gem.invalidate() } return nil @@ -217,25 +221,41 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) genAtStart := gem.invalidationGen gem.gemMutex.Unlock() - err := gem.refreshOnce(ctx) - - gem.gemMutex.Lock() - flight.err = err - gem.lastUpdateErr = err - if gem.invalidationGen == genAtStart { - // No invalidation occurred during the flight, so commit the - // timestamps and let the throttle take effect. - gem.lastAttemptTime = time.Now() - if err == nil { - gem.lastUpdateTime = gem.lastAttemptTime - } - } - // If invalidationGen changed, leave the timestamps untouched so the - // next caller observes shouldRefresh()==true and performs a fresh - // refresh that reflects the post-invalidation state. - gem.inflight = nil - gem.gemMutex.Unlock() - close(flight.done) + // Panic-safe cleanup: if refreshOnce (or anything it transitively calls + // -- the pipeline, JSON unmarshal, locationCache.update) panics, we + // MUST still clear gem.inflight and close flight.done, otherwise every + // subsequent Update caller blocks forever on <-flight.done. We capture + // any panic, record it as the flight error, and re-panic after cleanup. + var err error + func() { + defer func() { + r := recover() + gem.gemMutex.Lock() + if r != nil && err == nil { + err = fmt.Errorf("panic in GEM refresh: %v", r) + } + flight.err = err + gem.lastUpdateErr = err + if gem.invalidationGen == genAtStart { + // No invalidation occurred during the flight, so commit the + // timestamps and let the throttle take effect. + gem.lastAttemptTime = time.Now() + if err == nil { + gem.lastUpdateTime = gem.lastAttemptTime + } + } + // If invalidationGen changed, leave the timestamps untouched so + // the next caller observes shouldRefresh()==true and performs a + // fresh refresh that reflects the post-invalidation state. + gem.inflight = nil + gem.gemMutex.Unlock() + close(flight.done) + if r != nil { + panic(r) + } + }() + err = gem.refreshOnce(ctx) + }() return err } diff --git a/sdk/data/azcosmos/cosmos_location_cache.go b/sdk/data/azcosmos/cosmos_location_cache.go index a6c1a8f5a5c0..aefd435795ce 100644 --- a/sdk/data/azcosmos/cosmos_location_cache.go +++ b/sdk/data/azcosmos/cosmos_location_cache.go @@ -242,19 +242,26 @@ func (lc *locationCache) canUseMultipleWriteLocs() bool { return lc.enableMultipleWriteLocations } -func (lc *locationCache) markEndpointUnavailableForRead(endpoint url.URL) error { +func (lc *locationCache) markEndpointUnavailableForRead(endpoint url.URL) (wasAlreadyUnavailable bool, err error) { return lc.markEndpointUnavailable(endpoint, read) } -func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) error { +func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) (wasAlreadyUnavailable bool, err error) { return lc.markEndpointUnavailable(endpoint, write) } -func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) error { +// markEndpointUnavailable atomically samples whether the endpoint was already +// unavailable for `op` and records the unavailability. Returning the prior +// state from inside the same critical section that performs the mark +// eliminates the check-then-act race exploited by concurrent callers (see +// issue #25468 followup: bound on concurrent same-endpoint marks). +func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) (wasAlreadyUnavailable bool, err error) { now := time.Now() lc.mapMutex.Lock() defer lc.mapMutex.Unlock() if info, ok := lc.locationUnavailabilityInfoMap[endpoint]; ok { + wasAlreadyUnavailable = op&info.unavailableOps == op && + time.Since(info.lastCheckTime) < lc.unavailableLocationExpirationTime info.lastCheckTime = now info.unavailableOps |= op lc.locationUnavailabilityInfoMap[endpoint] = info @@ -264,7 +271,7 @@ func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedO unavailableOps: op, } } - return lc.updateLocked(nil, nil, nil, nil) + return wasAlreadyUnavailable, lc.updateLocked(nil, nil, nil, nil) } func (lc *locationCache) databaseAccountRead(dbAcct accountProperties) error { diff --git a/sdk/data/azcosmos/cosmos_location_cache_test.go b/sdk/data/azcosmos/cosmos_location_cache_test.go index d8588b38b42b..0ddf9b82b95b 100644 --- a/sdk/data/azcosmos/cosmos_location_cache_test.go +++ b/sdk/data/azcosmos/cosmos_location_cache_test.go @@ -86,7 +86,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { lc := ResetLocationCache() var firstCheckTime time.Time // mark endpoint unavailable for first time - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -103,7 +103,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { } // mark endpoint unavailable for second time time.Sleep(100 * time.Millisecond) - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -123,7 +123,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { func TestRefreshStaleEndpoints(t *testing.T) { lc := ResetLocationCache() // mark endpoint unavailable for first time - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -142,11 +142,11 @@ func TestRefreshStaleEndpoints(t *testing.T) { func TestIsEndpointUnavailable(t *testing.T) { lc := ResetLocationCache() - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - err = lc.markEndpointUnavailableForWrite(*loc2Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc2Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -254,7 +254,7 @@ func TestGetPrefAvailableEndpoints(t *testing.T) { t.Fatalf("Received error Reading DB account: %s", err.Error()) } // marks loc1 unavailable, which will put it last in the preferred available endpoint list - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -304,7 +304,7 @@ func TestReadEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - err = lc.markEndpointUnavailableForRead(*loc2Endpoint) + _, err = lc.markEndpointUnavailableForRead(*loc2Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -354,7 +354,7 @@ func TestWriteEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } From e4cd0a3d102f6f35a01231c05d951124a948e4db Mon Sep 17 00:00:00 2001 From: Tomas Varon Date: Tue, 19 May 2026 20:16:37 -0700 Subject: [PATCH 3/8] azcosmos: address PR Deep Reviewer findings (data-plane stall, waiter ctx, async panic) Follow-up to PR #26815 / issue #25468 addressing findings from the PR Deep Reviewer pass. BLOCKING finding addressed: * invalidate() zeroed lastUpdateTime, which made populated() return false. If the post-invalidate refresh then failed, every subsequent request was blocked for refreshTimeInterval (5 min default) by populated()==false + cached error -- even though the locationCache still held a valid regional topology. This was a regression introduced by this PR. Fix: add everPopulated bool, set true on first successful refresh and never reset. populated() reads everPopulated instead of the timestamp. lastUpdateErr is now only surfaced to throttled callers the cached topology while a refresh attempt is throttled after a failure. New regression test TestFix7_InvalidateThenRefreshFailure DoesNotStallDataPlane locks in the behaviour. Recommendations addressed: * gem.Update waiters now select between flight completion and ctx.Done(), so a caller-side timeout cannot be exceeded by an unrelated stuck refresh on a different goroutine. New regression test TestFix8_UpdateWaiterRespectsContextCancellation. * Async refresh goroutine in globalEndpointManagerPolicy.Do now recovers panics, so a panic re-raised by gem.Update's panic-safe defer (added in the previous commit) cannot bring down the host process via the detached goroutine. Test: * Tightened TestFix3c bound from <=2 to ==1 -- the combination of the atomic check-and-mark and the in-flight singleflight makes the upper bound provably 1 HTTP call for the tested fixture. All 14 fix tests pass with -race; gofmt and vet clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azcosmos/cosmos_dbaccount_refresh_test.go | 147 ++++++++++++++++-- .../cosmos_global_endpoint_manager.go | 51 ++++-- .../cosmos_global_endpoint_manager_policy.go | 6 +- 3 files changed, 181 insertions(+), 23 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index d46d16b63f4f..e052e55fc774 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -36,11 +36,12 @@ import ( // GetDatabaseAccount behaviour. body, when set, is returned as the response // body. type countingTransport struct { - count atomic.Int64 - status int - body []byte - respErr error - delay time.Duration + count atomic.Int64 + status int + body []byte + respErr error + delay time.Duration + respFunc func() (int, []byte) // when non-nil, overrides status/body per call } func (c *countingTransport) Do(req *http.Request) (*http.Response, error) { @@ -51,10 +52,14 @@ func (c *countingTransport) Do(req *http.Request) (*http.Response, error) { if c.respErr != nil { return nil, c.respErr } + status, body := c.status, c.body + if c.respFunc != nil { + status, body = c.respFunc() + } resp := &http.Response{ - StatusCode: c.status, + StatusCode: status, Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: jsonBody(c.body), + Body: jsonBody(body), Request: req, } return resp, nil @@ -270,11 +275,18 @@ func TestFix3c_ConcurrentSameEndpointMarksAreBounded(t *testing.T) { // With the atomic check-and-mark in markEndpointUnavailable, only the // FIRST goroutine to win the mapMutex Lock observes wasAlreadyUnavailable // == false and triggers invalidate(). All other markers see true and - // skip the invalidation. The leader's refresh + at most one - // post-invalidation refresh therefore bounds the total to 2. + // skip the invalidation. Combined with the in-flight singleflight in + // gem.Update, the upper bound is provably 1 HTTP call -- the leader's + // refresh handles the single invalidation, and waiters share its result. + // If invalidate were to land AFTER the leader started its flight + // (impossible here because the test fixture starts with a non-stale + // lastUpdateTime and the marker is the trigger for the first refresh), + // invalidationGen would advance mid-flight and a second leader would + // fire -- so 2 is the theoretical maximum across all timings. We + // assert the tight bound to guard against regressions in atomicity. calls := transport.count.Load() - require.LessOrEqual(t, calls, int64(2), - "concurrent same-endpoint marks must produce a tightly-bounded number of GEM calls (got %d for concurrency=%d)", calls, concurrency) + require.Equal(t, int64(1), calls, + "concurrent same-endpoint marks must collapse to exactly 1 GEM call (got %d for concurrency=%d)", calls, concurrency) } // to every request while the GEM has never been successfully populated, not @@ -634,3 +646,116 @@ func TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion(t *testing.T) "reads must route to a read region even when there are zero write regions; got default at idx=%d", idx) } } + +// ---------------------------------------------------------------------------- +// F7: a transient post-invalidation refresh failure must NOT stall the data +// plane for the full refreshTimeInterval. Once the GEM has ever been +// populated, requests should keep routing through the cached topology even +// while a refresh attempt is throttled after a failure. This guards the +// regression flagged by the deep reviewer where invalidate() zeroed +// lastUpdateTime, which in turn made populated() return false and caused +// the policy's bootstrap path to surface the cached error on every request. +// ---------------------------------------------------------------------------- +func TestFix7_InvalidateThenRefreshFailureDoesNotStallDataPlane(t *testing.T) { + // Programmable transport: succeed once (bootstrap), then fail. + responses := []int{http.StatusOK, http.StatusBadRequest, http.StatusBadRequest} + idx := atomic.Int32{} + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}}, + }) + transport := &countingTransport{} + transport.respFunc = func() (int, []byte) { + i := idx.Add(1) - 1 + if int(i) < len(responses) { + status := responses[i] + if status == http.StatusOK { + return status, body + } + return status, nil + } + return http.StatusBadRequest, nil + } + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + // 1. Bootstrap succeeds. + r1, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r1.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r1) + require.NoError(t, err, "bootstrap must succeed") + require.True(t, gem.populated(), "GEM must be populated after a successful bootstrap") + + // 2. Simulate a regional 403 -> MarkEndpointUnavailableForWrite -> invalidate. + west, _ := url.Parse("https://west-us.documents.azure.com:443/") + require.NoError(t, gem.MarkEndpointUnavailableForWrite(*west)) + + // 3. populated() must remain true even though lastUpdateTime is zero. + require.True(t, gem.populated(), + "populated() must remain true after invalidate() -- otherwise a transient refresh failure stalls the data plane") + + // 4. A subsequent data-plane request must succeed (routing through the cached + // topology) even if the post-invalidation refresh attempt fails. + r2, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r2.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err = pl.Do(r2) + require.NoError(t, err, + "data-plane request must succeed via cached topology even when post-invalidate refresh fails") +} + +// ---------------------------------------------------------------------------- +// F8: a waiter on an in-flight refresh must respect its own ctx deadline, +// not block for the leader's full HTTP round-trip duration. This matters +// because clientRetryPolicy's GEM-refresh calls pass req.Raw().Context() +// (a cancellable user context), so a caller-side timeout must take effect +// promptly even when the user happens to be a waiter rather than the leader. +// ---------------------------------------------------------------------------- +func TestFix8_UpdateWaiterRespectsContextCancellation(t *testing.T) { + // Slow leader: holds the in-flight slot for 2 seconds. + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 2 * time.Second} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // Leader starts a refresh in the background. + leaderDone := make(chan struct{}) + go func() { + defer close(leaderDone) + _ = gem.Update(context.Background(), false) + }() + + // Wait until the leader is actually in-flight. + deadline := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadline) { + gem.gemMutex.Lock() + inflight := gem.inflight != nil + gem.gemMutex.Unlock() + if inflight { + break + } + time.Sleep(5 * time.Millisecond) + } + + // Waiter has a 100 ms deadline. It must return promptly with the + // deadline error rather than blocking for the leader's 2-second HTTP + // call to complete. + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + start := time.Now() + err := gem.Update(ctx, false) + waited := time.Since(start) + + require.ErrorIs(t, err, context.DeadlineExceeded, "waiter must surface ctx error") + require.Less(t, waited, 500*time.Millisecond, + "waiter must respect ctx deadline -- waited %v but ctx deadline was 100ms", waited) + + // Clean up: let the leader finish so the test doesn't leak the goroutine. + <-leaderDone +} diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index cbe2a8bccc81..9922c805c778 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -38,6 +38,14 @@ type globalEndpointManager struct { // ever been successfully populated, so a chronic bootstrap failure is // surfaced on every request rather than silently swallowed. lastUpdateErr error + // everPopulated is set true on the first successful refresh and never + // reset, even when invalidate() zeroes lastUpdateTime to force a fresh + // refresh. populated() reads this flag rather than the timestamp, so a + // transient post-invalidation refresh failure does not stall the data + // plane: requests can still route through the existing locationCache + // topology while the next refresh attempt is pending the throttle + // window. See issue #25468 deep-review finding. + everPopulated bool // inflight coalesces concurrent Update callers: only the first does the // HTTP call; the rest wait on the per-flight done channel and read // per-flight err. Each refresh has its own *updateFlight so late waiters @@ -148,14 +156,17 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() { gem.locationCache.refreshStaleEndpoints() } -// populated reports whether the GEM has been successfully refreshed at least -// once. Used by the policy's bootstrap path to decide whether the first -// request on a new client should synchronously wait for GetDatabaseAccount. -// Repeated calls after the first success are cheap (one mutex acquisition). +// populated reports whether the GEM has ever been successfully populated. +// Unlike a check of !lastUpdateTime.IsZero(), this remains true after +// invalidate() zeroes the timestamps -- so a transient post-invalidation +// refresh failure does not stall the data plane: the policy can still +// route requests through the existing locationCache topology while the +// next refresh attempt waits for the throttle window. See issue #25468 +// deep-review finding. func (gem *globalEndpointManager) populated() bool { gem.gemMutex.Lock() defer gem.gemMutex.Unlock() - return !gem.lastUpdateTime.IsZero() + return gem.everPopulated } func (gem *globalEndpointManager) ShouldRefresh() bool { @@ -190,13 +201,25 @@ func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, reso // If the GEM has never been successfully populated and the throttle is // active, Update returns the cached error from the most recent failed // attempt so a chronic bootstrap failure is surfaced on every request -// rather than silently swallowed. +// rather than silently swallowed. Once the GEM HAS been populated, a +// throttled Update returns nil even after a subsequent refresh failure -- +// the data plane keeps using the cached topology until the throttle expires +// and a fresh refresh can run. +// +// Update respects ctx cancellation in both leader and waiter roles. The +// leader's HTTP call uses ctx directly (or a derived context inside +// GetAccountProperties). Waiters select between flight completion and +// ctx.Done() so a caller-side timeout cannot be exceeded by an unrelated +// stuck refresh. func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) error { gem.gemMutex.Lock() if !gem.shouldRefresh() && !forceRefresh { - // Throttled. Surface the cached error if we have never succeeded. + // Throttled. Surface the cached error only if we have NEVER + // successfully populated the GEM -- otherwise the data plane has + // a valid cached topology and should continue working until the + // next refresh attempt succeeds. var cached error - if gem.lastUpdateTime.IsZero() { + if !gem.everPopulated { cached = gem.lastUpdateErr } gem.gemMutex.Unlock() @@ -206,11 +229,16 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) // Another goroutine is performing a refresh. Wait for it and share // its result rather than spawning a duplicate HTTP call. The result // lives on the per-flight struct so subsequent flights cannot - // overwrite it. + // overwrite it. Honour the waiter's ctx so a caller-side timeout + // is not extended by the leader's HTTP call duration. flight := gem.inflight gem.gemMutex.Unlock() - <-flight.done - return flight.err + select { + case <-flight.done: + return flight.err + case <-ctx.Done(): + return ctx.Err() + } } // We are the leader. Publish the inflight flight and snapshot the // invalidation generation, then release the lock while we perform the @@ -242,6 +270,7 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) gem.lastAttemptTime = time.Now() if err == nil { gem.lastUpdateTime = gem.lastAttemptTime + gem.everPopulated = true } } // If invalidationGen changed, leave the timestamps untouched so diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go index eba333f8791c..7960dfe76125 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go @@ -34,7 +34,11 @@ func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, e if p.gem.ShouldRefresh() { go func() { // Concurrent goroutines spawned here are coalesced inside - // gem.Update via the single-in-flight pattern. + // gem.Update via the single-in-flight pattern. gem.Update's + // panic-safe defer re-panics after cleanup; recover here so a + // panic in the GEM pipeline does not bring down the host + // process via this detached goroutine. + defer func() { _ = recover() }() _ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) }() } From 3e82009972e2ebedd7ce851fe09d75ea7a683b0e Mon Sep 17 00:00:00 2001 From: Tomas Varon Date: Tue, 19 May 2026 22:47:04 -0700 Subject: [PATCH 4/8] azcosmos: Go best-practices polish from second deep-review pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Idiomatic improvements identified by the PR Deep Reviewer's Go-craft pass. No functional behaviour changes. cosmos_global_endpoint_manager.go: * everPopulated is now atomic.Bool, so the policy's hot-path populated() check is lock-free instead of contending on gemMutex against Update's critical section. * Hoisted the panic-safe defer out of an IIFE in Update -- a plain defer at function scope has identical semantics with less ceremony. * Wrap GetAccountProperties / locationCache.update errors with %w so callers can errors.As / errors.Is the underlying response error when the cached error is surfaced via lastUpdateErr. * Added hasInflight() test helper so tests don't peek at gemMutex/ inflight directly; keeps the test surface stable across future refactors of the internal sync primitives. * One-line comment documenting that lastUpdateErr is intentionally shared across force=true and force=false callers. cosmos_global_endpoint_manager_policy.go: * Gate the async refresh goroutine spawn with an atomic.Bool CAS. Without this, every request that observed ShouldRefresh()==true during a burst spawned its own goroutine that then queued as a waiter inside gem.Update. Now N→1 per refresh-window. * Recovered panic is logged via the azcore log facility instead of being silently dropped, so production crashes remain triageable. Captures the stack trace too. cosmos_dbaccount_refresh_test.go: * TestFix1b: replaced racy 30ms time.Sleep with require.Eventually polling the new hasInflight() helper. * TestFix8: replaced manual mutex poll loop with require.Eventually + hasInflight(). * TestFix2: include goroutine index in the failure message so a sporadic single-goroutine failure is easier to debug. * Gated the 500-goroutine soak tests and the 2-second TestFix8 behind testing.Short() so the dev-loop cost is opt-in. All tests pass with -race; -short also passes (skips three slow tests as designed). gofmt and vet clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azcosmos/cosmos_dbaccount_refresh_test.go | 35 ++++---- .../cosmos_global_endpoint_manager.go | 83 +++++++++++-------- .../cosmos_global_endpoint_manager_policy.go | 34 ++++++-- 3 files changed, 95 insertions(+), 57 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index e052e55fc774..f066fa584ed2 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -143,8 +143,8 @@ func TestFix2_ConcurrentUpdateCallersCoalesce(t *testing.T) { }(i) } wg.Wait() - for _, err := range errs { - require.NoError(t, err) + for i, err := range errs { + require.NoErrorf(t, err, "goroutine %d failed", i) } require.Equal(t, int64(1), transport.count.Load(), "concurrent Update callers must coalesce into a single HTTP call") @@ -216,7 +216,11 @@ func TestFix1b_InvalidateDuringInflightRefreshIsHonored(t *testing.T) { defer wg.Done() _ = gem.Update(context.Background(), false) }() - time.Sleep(30 * time.Millisecond) // ensure the leader has the inflight slot + // Wait deterministically for the leader to claim the in-flight slot. + // A naive time.Sleep is racy on loaded CI hosts; polling hasInflight() + // confirms the leader has actually entered refreshOnce. + require.Eventually(t, gem.hasInflight, time.Second, 2*time.Millisecond, + "leader must claim the in-flight slot within 1s") gem.invalidate() wg.Wait() @@ -406,6 +410,9 @@ func TestFix5_ReadEndpointsDoesNotDeadlock(t *testing.T) { // This is the headline regression guard for issue #25468. // ---------------------------------------------------------------------------- func TestRegression25468_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + if testing.Short() { + t.Skip("500-goroutine soak; skipped under -short") + } body, _ := json.Marshal(accountProperties{ ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, @@ -442,6 +449,9 @@ func TestRegression25468_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { // exactly one HTTP call, demonstrating that F1 + F2 together close the // failure-storm path. func TestRegression25468_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + if testing.Short() { + t.Skip("500-goroutine soak; skipped under -short") + } transport := &countingTransport{status: http.StatusBadRequest, delay: 20 * time.Millisecond} gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) pol := &globalEndpointManagerPolicy{gem: gem} @@ -716,6 +726,9 @@ func TestFix7_InvalidateThenRefreshFailureDoesNotStallDataPlane(t *testing.T) { // promptly even when the user happens to be a waiter rather than the leader. // ---------------------------------------------------------------------------- func TestFix8_UpdateWaiterRespectsContextCancellation(t *testing.T) { + if testing.Short() { + t.Skip("blocks on a 2s leader by design; skipped under -short") + } // Slow leader: holds the in-flight slot for 2 seconds. body, _ := json.Marshal(accountProperties{ ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, @@ -731,17 +744,11 @@ func TestFix8_UpdateWaiterRespectsContextCancellation(t *testing.T) { _ = gem.Update(context.Background(), false) }() - // Wait until the leader is actually in-flight. - deadline := time.Now().Add(500 * time.Millisecond) - for time.Now().Before(deadline) { - gem.gemMutex.Lock() - inflight := gem.inflight != nil - gem.gemMutex.Unlock() - if inflight { - break - } - time.Sleep(5 * time.Millisecond) - } + // Wait until the leader is actually in-flight via the hasInflight test + // helper rather than peeking at internal mutex state -- keeps the test + // resilient to changes in the GEM's internal synchronization primitives. + require.Eventually(t, gem.hasInflight, 500*time.Millisecond, 5*time.Millisecond, + "leader must claim the in-flight slot within 500ms") // Waiter has a 100 ms deadline. It must return promptly with the // deadline error rather than blocking for the leader's 2-second HTTP diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index 9922c805c778..ea1b588bb8fa 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" @@ -44,8 +45,9 @@ type globalEndpointManager struct { // transient post-invalidation refresh failure does not stall the data // plane: requests can still route through the existing locationCache // topology while the next refresh attempt is pending the throttle - // window. See issue #25468 deep-review finding. - everPopulated bool + // window. See issue #25468 deep-review finding. Stored as atomic.Bool + // so the hot-path policy check is lock-free. + everPopulated atomic.Bool // inflight coalesces concurrent Update callers: only the first does the // HTTP call; the rest wait on the per-flight done channel and read // per-flight err. Each refresh has its own *updateFlight so late waiters @@ -162,11 +164,20 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() { // refresh failure does not stall the data plane: the policy can still // route requests through the existing locationCache topology while the // next refresh attempt waits for the throttle window. See issue #25468 -// deep-review finding. +// deep-review finding. Lock-free atomic read so the policy's hot path +// does not contend on gemMutex. func (gem *globalEndpointManager) populated() bool { + return gem.everPopulated.Load() +} + +// hasInflight is a test-only accessor for the in-flight refresh slot. +// Tests use it to wait until a leader has claimed the slot before firing +// follow-up calls; keeping the field access inside the GEM means tests +// don't need to grab gemMutex directly. +func (gem *globalEndpointManager) hasInflight() bool { gem.gemMutex.Lock() defer gem.gemMutex.Unlock() - return gem.everPopulated + return gem.inflight != nil } func (gem *globalEndpointManager) ShouldRefresh() bool { @@ -217,9 +228,11 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) // Throttled. Surface the cached error only if we have NEVER // successfully populated the GEM -- otherwise the data plane has // a valid cached topology and should continue working until the - // next refresh attempt succeeds. + // next refresh attempt succeeds. The cached error is shared across + // force=true and force=false callers: both want to surface + // "bootstrap is broken" and there's no caller-visible distinction. var cached error - if !gem.everPopulated { + if !gem.everPopulated.Load() { cached = gem.lastUpdateErr } gem.gemMutex.Unlock() @@ -255,36 +268,34 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) // subsequent Update caller blocks forever on <-flight.done. We capture // any panic, record it as the flight error, and re-panic after cleanup. var err error - func() { - defer func() { - r := recover() - gem.gemMutex.Lock() - if r != nil && err == nil { - err = fmt.Errorf("panic in GEM refresh: %v", r) - } - flight.err = err - gem.lastUpdateErr = err - if gem.invalidationGen == genAtStart { - // No invalidation occurred during the flight, so commit the - // timestamps and let the throttle take effect. - gem.lastAttemptTime = time.Now() - if err == nil { - gem.lastUpdateTime = gem.lastAttemptTime - gem.everPopulated = true - } - } - // If invalidationGen changed, leave the timestamps untouched so - // the next caller observes shouldRefresh()==true and performs a - // fresh refresh that reflects the post-invalidation state. - gem.inflight = nil - gem.gemMutex.Unlock() - close(flight.done) - if r != nil { - panic(r) + defer func() { + r := recover() + gem.gemMutex.Lock() + if r != nil && err == nil { + err = fmt.Errorf("panic in GEM refresh: %v", r) + } + flight.err = err + gem.lastUpdateErr = err + if gem.invalidationGen == genAtStart { + // No invalidation occurred during the flight, so commit the + // timestamps and let the throttle take effect. + gem.lastAttemptTime = time.Now() + if err == nil { + gem.lastUpdateTime = gem.lastAttemptTime + gem.everPopulated.Store(true) } - }() - err = gem.refreshOnce(ctx) + } + // If invalidationGen changed, leave the timestamps untouched so + // the next caller observes shouldRefresh()==true and performs a + // fresh refresh that reflects the post-invalidation state. + gem.inflight = nil + gem.gemMutex.Unlock() + close(flight.done) + if r != nil { + panic(r) + } }() + err = gem.refreshOnce(ctx) return err } @@ -293,7 +304,7 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) func (gem *globalEndpointManager) refreshOnce(ctx context.Context) error { accountProperties, err := gem.GetAccountProperties(ctx) if err != nil { - return fmt.Errorf("failed to retrieve account properties: %v", err) + return fmt.Errorf("failed to retrieve account properties: %w", err) } if err := gem.locationCache.update( accountProperties.WriteRegions, @@ -301,7 +312,7 @@ func (gem *globalEndpointManager) refreshOnce(ctx context.Context) error { gem.preferredLocations, &accountProperties.EnableMultipleWriteLocations, ); err != nil { - return fmt.Errorf("failed to update location cache: %v", err) + return fmt.Errorf("failed to update location cache: %w", err) } return nil } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go index 7960dfe76125..cff09569fc58 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go @@ -6,12 +6,25 @@ package azcosmos import ( "context" "net/http" + "runtime/debug" + "sync/atomic" + azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" ) type globalEndpointManagerPolicy struct { gem *globalEndpointManager + // asyncRefreshPending gates the spawn of the async refresh goroutine. + // Without this gate every request that observes ShouldRefresh()==true + // (potentially thousands during a burst that arrives right as the + // throttle expires) would spawn its own goroutine, each of which would + // then queue as a waiter inside gem.Update. The singleflight in Update + // collapses them to one HTTP call, but the goroutine + select overhead + // is wasted. CAS this to true before spawning; the goroutine clears it + // on exit. + asyncRefreshPending atomic.Bool } func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, error) { @@ -31,14 +44,21 @@ func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, e // triggering request. err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) } - if p.gem.ShouldRefresh() { + if p.gem.ShouldRefresh() && p.asyncRefreshPending.CompareAndSwap(false, true) { go func() { - // Concurrent goroutines spawned here are coalesced inside - // gem.Update via the single-in-flight pattern. gem.Update's - // panic-safe defer re-panics after cleanup; recover here so a - // panic in the GEM pipeline does not bring down the host - // process via this detached goroutine. - defer func() { _ = recover() }() + defer p.asyncRefreshPending.Store(false) + // gem.Update's panic-safe defer re-panics after cleanup. We + // recover here so a panic in the GEM pipeline does not bring + // down the host process via this detached goroutine. The + // recovered value is logged (rather than silently dropped) so + // production crashes remain triageable. + defer func() { + if r := recover(); r != nil { + log.Writef(azlog.EventResponse, + "panic in azcosmos GEM async refresh: %v\n%s", + r, debug.Stack()) + } + }() _ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) }() } From 2929d08e41b8801a3c7f847eebc8b7e20659eeb1 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 21 May 2026 17:43:58 -0700 Subject: [PATCH 5/8] azcosmos: revert cross-region/forceRefresh behavior, address deep-review #1,#2,#4,#13, comment cleanup Behavioral reverts (preserve long-standing semantics): - cosmos_client_retry_policy.go: write-retry path uses gem.Update(ctx, true) to force-refresh per attempt (intentional; restores prior behavior). - cosmos_location_cache.go: resolveServiceEndpoint with enableCrossRegionRetries=false returns lc.defaultEndpoint for single-master writes (avoids a breaking change). Fixes addressing prior deep-review findings: - cosmos_global_endpoint_manager_policy.go: * Log async refresh failures (was discarded via '_ ='); chronic post-bootstrap topology drift is now observable. * Capture refreshCtx via context.WithoutCancel BEFORE launching the goroutine so it does not depend on req's lifetime. - cosmos_global_endpoint_manager.go: leader's HTTP call runs under context.WithoutCancel(ctx); waiters still respect their own ctx so caller-side cancellation no longer poisons coalesced waiters. - cosmos_location_cache.go markEndpointUnavailable: fast-path skips the full updateLocked recompute when wasAlreadyUnavailable==true (route lists cannot change in that case). Tests and docs: - TestFix3 renamed/inverted to TestFix3_WriteRetryForceRefreshesGEM. - TestDefaultEndpointElim_CrossRegionRetriesDisabled* renamed/inverted to assert the restored default-endpoint behavior. - TestRegression25468_* renamed to TestRegression_*. - Comments stripped of #25468 references and tightened. - CHANGELOG updated to link to PR 26815 and reflect the reverted cross-region-retries=false behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/data/azcosmos/CHANGELOG.md | 4 +- .../azcosmos/cosmos_client_retry_policy.go | 8 +--- .../azcosmos/cosmos_dbaccount_refresh_test.go | 38 +++++++-------- .../cosmos_global_endpoint_manager.go | 48 ++++++++----------- .../cosmos_global_endpoint_manager_policy.go | 17 +++++-- sdk/data/azcosmos/cosmos_location_cache.go | 39 +++++++-------- .../azcosmos/cosmos_location_cache_test.go | 14 +++--- 7 files changed, 76 insertions(+), 92 deletions(-) diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 66a2ce4c43f0..9659f725b079 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -14,8 +14,8 @@ ### Bugs Fixed -* Fixed excessive `GetDatabaseAccount` (region topology) HTTP calls when using preferred regions. Previously a failed refresh did not advance the throttle, concurrent callers each spawned a goroutine that issued its own HTTP call, write retries on `403/WriteForbidden` force-refreshed the global endpoint manager on every retry attempt, and a failed bootstrap could pin the client into a permanent refresh storm. Concurrent refreshes are now coalesced via a single in-flight pattern, failures honour the same throttle as successes, write retries refresh at most once per newly-unavailable endpoint, and a chronic bootstrap failure surfaces the cached error rather than retrying on every request. Also fixed a self-deadlock in `locationCache.readEndpoints`/`writeEndpoints` on the stale-endpoints refresh path. See [issue 25468](https://github.com/Azure/azure-sdk-for-go/issues/25468). -* Data-plane requests no longer route to the customer-supplied (default) endpoint as a fallback once the account topology is populated. Previously every route list trailed into the default endpoint, so retry traversal eventually issued data-plane requests there even when full regional metadata was available; `enableCrossRegionRetries=false` also caused single-master writes to fall back to the default endpoint instead of pinning to a regional one. The only remaining data-plane code path that targets the default endpoint is the degenerate case of an account advertising zero write regions on a write request. See [issue 25468](https://github.com/Azure/azure-sdk-for-go/issues/25468). +* Fixed excessive `GetDatabaseAccount` (region topology) HTTP calls when using preferred regions. Previously a failed refresh did not advance the throttle, concurrent callers each spawned a goroutine that issued its own HTTP call, and a failed bootstrap could pin the client into a permanent refresh storm. Concurrent refreshes are now coalesced via a single in-flight pattern, failures honour the same throttle as successes, and a chronic bootstrap failure surfaces the cached error rather than retrying on every request. Also fixed a self-deadlock in `locationCache.readEndpoints`/`writeEndpoints` on the stale-endpoints refresh path. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815). +* Data-plane retries no longer trail into the customer-supplied (default) endpoint once the account topology is populated. Previously every preferred route list ended with the default endpoint, so retry traversal eventually issued data-plane requests there even when full regional metadata was available. Route lists now contain only regional endpoints. `resolveServiceEndpoint` behavior is unchanged: `enableCrossRegionRetries=false` still routes single-master writes to the default endpoint, and accounts advertising zero write regions still fall back to it for writes. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815). ### Other Changes diff --git a/sdk/data/azcosmos/cosmos_client_retry_policy.go b/sdk/data/azcosmos/cosmos_client_retry_policy.go index 76637336da07..11f9646b951c 100644 --- a/sdk/data/azcosmos/cosmos_client_retry_policy.go +++ b/sdk/data/azcosmos/cosmos_client_retry_policy.go @@ -142,13 +142,7 @@ func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, i } } - // Pass forceRefresh=false. The MarkEndpointUnavailable* calls above - // already invalidate the GEM cache the first time an endpoint becomes - // unavailable, so the next Update will actually issue a refresh. On - // subsequent retries within the unavailability window we honour the - // 5-min throttle instead of forcing a fresh GetDatabaseAccount call per - // attempt (https://github.com/Azure/azure-sdk-for-go/issues/25468). - err := p.gem.Update(req.Raw().Context(), false) + err := p.gem.Update(req.Raw().Context(), true) if err != nil { return false, err } diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index f066fa584ed2..742b91001f4c 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -1,14 +1,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -// Regression tests for the fixes applied for -// https://github.com/Azure/azure-sdk-for-go/issues/25468 -- excess -// GetDatabaseAccount calls observed with preferred regions configured. +// Regression tests for excess GetDatabaseAccount calls observed with +// preferred regions configured. // // These tests cover: // F1: failed GEM Update is throttled to refreshTimeInterval (lastAttemptTime) // F2: concurrent Update callers are coalesced into a single HTTP call -// F3: write-retry on 403 issues at most one GEM call per logical request +// F3: write-retry on 403/WriteForbidden force-refreshes the GEM per attempt // F4: a failed initial bootstrap Update is retried on the next request // F5: locationCache.readEndpoints does not deadlock on the stale+unavailable path // Soak: under sustained mixed load, total GEM calls respect refreshTimeInterval @@ -151,11 +150,10 @@ func TestFix2_ConcurrentUpdateCallersCoalesce(t *testing.T) { } // ---------------------------------------------------------------------------- -// F3: write-retry on 403/WriteForbidden invalidates the GEM exactly once -// (when the endpoint is newly marked unavailable), then subsequent retries -// within the unavailability window are throttled. +// F3: write-retry on 403/WriteForbidden force-refreshes the GEM on every +// retry attempt so the client picks up topology changes immediately. // ---------------------------------------------------------------------------- -func TestFix3_WriteRetryIssuesAtMostOneGEMCall(t *testing.T) { +func TestFix3_WriteRetryForceRefreshesGEM(t *testing.T) { defaultEndpoint, err := url.Parse("https://fake.documents.azure.com:443/") require.NoError(t, err) @@ -189,8 +187,8 @@ func TestFix3_WriteRetryIssuesAtMostOneGEMCall(t *testing.T) { require.True(t, shouldRetry) rc.retryCount++ } - require.Equal(t, int64(1), transport.count.Load(), - "write retries within the same unavailability window must collapse to one GEM call") + require.Equal(t, int64(writeRetries), transport.count.Load(), + "write retries on 403/WriteForbidden must force-refresh the GEM on every attempt") } // ---------------------------------------------------------------------------- @@ -407,9 +405,8 @@ func TestFix5_ReadEndpointsDoesNotDeadlock(t *testing.T) { // ---------------------------------------------------------------------------- // Soak test: a high-concurrency burst against a healthy GEM with the default // 5-min refresh interval should issue exactly one GetDatabaseAccount call. -// This is the headline regression guard for issue #25468. // ---------------------------------------------------------------------------- -func TestRegression25468_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { +func TestRegression_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { if testing.Short() { t.Skip("500-goroutine soak; skipped under -short") } @@ -448,7 +445,7 @@ func TestRegression25468_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { // Soak test variant: the same burst against a FAILING GEM must also produce // exactly one HTTP call, demonstrating that F1 + F2 together close the // failure-storm path. -func TestRegression25468_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T) { +func TestRegression_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T) { if testing.Short() { t.Skip("500-goroutine soak; skipped under -short") } @@ -481,7 +478,7 @@ func TestRegression25468_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T } // ---------------------------------------------------------------------------- -// Default-endpoint elimination (issue #25468 followup). +// Default-endpoint elimination. // After the GEM is populated, data-plane requests must never resolve to the // customer-supplied endpoint -- with the single exception of the degenerate // "zero write regions" case for write requests. @@ -610,18 +607,17 @@ func TestDefaultEndpointElim_ZeroWriteRegionsRetainsDefaultFallback(t *testing.T "zero-write-regions write must still route to the customer endpoint (documented degenerate case)") } -func TestDefaultEndpointElim_CrossRegionRetriesDisabledStillRoutesRegional(t *testing.T) { +func TestDefaultEndpointElim_CrossRegionRetriesDisabledUsesDefaultEndpoint(t *testing.T) { defaultHost := "customer-endpoint.documents.azure.com:443" gem := makeGEMWithRegions(t, false, []string{"East US"}, false /*enableCrossRegion=false*/) - // With cross-region retries disabled, the primary endpoint must still - // be regional; retries must pin to the same region (locationIndex=0 - // regardless of the caller's retry count). + // With cross-region retries disabled, single-master writes route to the + // default (customer-supplied) endpoint -- preserving the long-standing + // behavior of this flag. for idx := 0; idx < 10; idx++ { ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, true, false) - require.NotEqual(t, defaultHost, ep.Host, - "cross-region-retries=false must not fall back to default endpoint at idx=%d", idx) - require.Contains(t, ep.Host, "east-us") + require.Equal(t, defaultHost, ep.Host, + "cross-region-retries=false must route writes to the default endpoint at idx=%d", idx) } } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index ea1b588bb8fa..ec6618604120 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -29,10 +29,8 @@ type globalEndpointManager struct { lastUpdateTime time.Time // lastAttemptTime records the most recent Update attempt regardless of // outcome. shouldRefresh() honours it so a failed GetAccountProperties is - // throttled to refreshTimeInterval just like a successful one. This - // prevents the failure-loop described in - // https://github.com/Azure/azure-sdk-for-go/issues/25468 where every - // caller after a failed Update would re-issue the HTTP call immediately. + // throttled to refreshTimeInterval just like a successful one, preventing + // a failure-loop where every caller re-issues the HTTP call immediately. lastAttemptTime time.Time // lastUpdateErr is the error from the most recent refresh attempt. It is // returned to callers that hit the throttle window before the GEM has @@ -40,13 +38,12 @@ type globalEndpointManager struct { // surfaced on every request rather than silently swallowed. lastUpdateErr error // everPopulated is set true on the first successful refresh and never - // reset, even when invalidate() zeroes lastUpdateTime to force a fresh - // refresh. populated() reads this flag rather than the timestamp, so a - // transient post-invalidation refresh failure does not stall the data - // plane: requests can still route through the existing locationCache - // topology while the next refresh attempt is pending the throttle - // window. See issue #25468 deep-review finding. Stored as atomic.Bool - // so the hot-path policy check is lock-free. + // reset, even when invalidate() zeroes lastUpdateTime. populated() reads + // this flag rather than the timestamp, so a transient post-invalidation + // refresh failure does not stall the data plane: requests can still route + // through the existing locationCache topology while the next refresh + // attempt waits for the throttle window. Stored as atomic.Bool so the + // hot-path policy check is lock-free. everPopulated atomic.Bool // inflight coalesces concurrent Update callers: only the first does the // HTTP call; the rest wait on the per-flight done channel and read @@ -159,13 +156,10 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() { } // populated reports whether the GEM has ever been successfully populated. -// Unlike a check of !lastUpdateTime.IsZero(), this remains true after -// invalidate() zeroes the timestamps -- so a transient post-invalidation -// refresh failure does not stall the data plane: the policy can still -// route requests through the existing locationCache topology while the -// next refresh attempt waits for the throttle window. See issue #25468 -// deep-review finding. Lock-free atomic read so the policy's hot path -// does not contend on gemMutex. +// Unlike !lastUpdateTime.IsZero(), this remains true after invalidate() +// zeroes the timestamps, so a transient post-invalidation refresh failure +// does not stall the data plane. Lock-free atomic read keeps the policy +// hot path off gemMutex. func (gem *globalEndpointManager) populated() bool { return gem.everPopulated.Load() } @@ -190,7 +184,7 @@ func (gem *globalEndpointManager) shouldRefresh() bool { // Honor whichever happened more recently: a successful update or an // attempt that failed. Failures must be throttled too, otherwise a // failing endpoint causes every caller to re-issue GetDatabaseAccount - // immediately (issue #25468). + // immediately. last := gem.lastUpdateTime if gem.lastAttemptTime.After(last) { last = gem.lastAttemptTime @@ -206,8 +200,7 @@ func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, reso // Concurrent callers are coalesced via a single-in-flight pattern so at most // one HTTP call is in flight per client at any time. Both successes and // failures advance lastAttemptTime so the next refresh is throttled to -// refreshTimeInterval -- this prevents the failure-storm described in -// https://github.com/Azure/azure-sdk-for-go/issues/25468. +// refreshTimeInterval, preventing a failure-storm. // // If the GEM has never been successfully populated and the throttle is // active, Update returns the cached error from the most recent failed @@ -217,11 +210,12 @@ func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, reso // the data plane keeps using the cached topology until the throttle expires // and a fresh refresh can run. // -// Update respects ctx cancellation in both leader and waiter roles. The -// leader's HTTP call uses ctx directly (or a derived context inside -// GetAccountProperties). Waiters select between flight completion and -// ctx.Done() so a caller-side timeout cannot be exceeded by an unrelated -// stuck refresh. +// Update respects ctx cancellation for waiters. The leader's HTTP call runs +// under context.WithoutCancel(ctx) so an unrelated caller-side cancellation +// does not poison the shared flight result for other coalesced waiters +// (GetAccountProperties applies its own 60s timeout). Waiters select between +// flight completion and their own ctx.Done() so a caller-side timeout cannot +// be exceeded by an unrelated stuck refresh. func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) error { gem.gemMutex.Lock() if !gem.shouldRefresh() && !forceRefresh { @@ -295,7 +289,7 @@ func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) panic(r) } }() - err = gem.refreshOnce(ctx) + err = gem.refreshOnce(context.WithoutCancel(ctx)) return err } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go index cff09569fc58..1a6bb47c96f6 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go @@ -33,9 +33,8 @@ func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, e // callers are coalesced inside gem.Update via the single-in-flight // pattern, so at most one HTTP call is in flight. If the call fails // the throttle in gem.Update (lastAttemptTime) ensures subsequent - // bootstrap retries respect refreshTimeInterval -- preventing the - // failure storm described in - // https://github.com/Azure/azure-sdk-for-go/issues/25468. + // bootstrap retries respect refreshTimeInterval, preventing a + // failure storm. var err error if !p.gem.populated() { // Use the same context, but without the cancellation signal. @@ -45,6 +44,9 @@ func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, e err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) } if p.gem.ShouldRefresh() && p.asyncRefreshPending.CompareAndSwap(false, true) { + // Capture the context before launching the goroutine so we do not + // depend on req's lifetime after the policy returns. + refreshCtx := context.WithoutCancel(req.Raw().Context()) go func() { defer p.asyncRefreshPending.Store(false) // gem.Update's panic-safe defer re-panics after cleanup. We @@ -59,7 +61,14 @@ func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, e r, debug.Stack()) } }() - _ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) + // Log refresh failures so a chronically failing GEM is + // observable. Without this, post-bootstrap topology drift is + // silent: the data plane keeps routing to the cached topology + // and callers see no signal until requests start failing. + if err := p.gem.Update(refreshCtx, false); err != nil { + log.Writef(azlog.EventResponse, + "azcosmos GEM async refresh failed: %v", err) + } }() } if err != nil { diff --git a/sdk/data/azcosmos/cosmos_location_cache.go b/sdk/data/azcosmos/cosmos_location_cache.go index aefd435795ce..82149501148e 100644 --- a/sdk/data/azcosmos/cosmos_location_cache.go +++ b/sdk/data/azcosmos/cosmos_location_cache.go @@ -116,9 +116,8 @@ func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocati } // Choose regional fallbacks so the route lists never trail into the - // customer-supplied default endpoint. See issue #25468 / followup - // "no data-plane traffic to default endpoint": the only data-plane code - // path that may still hit the default endpoint is the degenerate "zero + // customer-supplied default endpoint. The only data-plane code path + // that may still hit the default endpoint is the degenerate "zero // write regions on a write" case. writeFallback := lc.defaultEndpoint if len(nextLoc.availWriteLocations) > 0 { @@ -147,23 +146,11 @@ func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocati func (lc *locationCache) resolveServiceEndpoint(locationIndex int, resourceType resourceType, isWriteOperation, useWriteEndpoint bool) url.URL { if (isWriteOperation || useWriteEndpoint) && !lc.canUseMultipleWriteLocsToRoute(resourceType) { - if len(lc.locationInfo.availWriteLocations) > 0 { - // Prefer a regional endpoint. The cross-region-retries flag only - // gates whether we walk across regions on retry; it must not - // cause the primary endpoint to fall back to the default - // endpoint when regional metadata is available - // (issue #25468 followup: no data-plane traffic to default). - if lc.enableCrossRegionRetries { - locationIndex = min(locationIndex%2, len(lc.locationInfo.availWriteLocations)-1) - } else { - locationIndex = 0 - } + if lc.enableCrossRegionRetries && len(lc.locationInfo.availWriteLocations) > 0 { + locationIndex = min(locationIndex%2, len(lc.locationInfo.availWriteLocations)-1) writeLocation := lc.locationInfo.availWriteLocations[locationIndex] return lc.locationInfo.availWriteEndpointsByLocation[writeLocation] } - // Degenerate case: account metadata advertises zero write regions. - // Per the project decision this is the ONLY data-plane code path - // where we still fall back to the customer-supplied endpoint. return lc.defaultEndpoint } @@ -253,8 +240,7 @@ func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) (wasA // markEndpointUnavailable atomically samples whether the endpoint was already // unavailable for `op` and records the unavailability. Returning the prior // state from inside the same critical section that performs the mark -// eliminates the check-then-act race exploited by concurrent callers (see -// issue #25468 followup: bound on concurrent same-endpoint marks). +// eliminates the check-then-act race exploited by concurrent callers. func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) (wasAlreadyUnavailable bool, err error) { now := time.Now() lc.mapMutex.Lock() @@ -271,6 +257,14 @@ func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedO unavailableOps: op, } } + // Fast path: if the endpoint was already unavailable for this op within + // the unavailability window, the route lists cannot change -- skip the + // full updateLocked recompute. The only state mutation is the bumped + // lastCheckTime, which getPrefAvailableEndpointsLocked observes via + // isEndpointUnavailableLocked and which yields the same result. + if wasAlreadyUnavailable { + return wasAlreadyUnavailable, nil + } return wasAlreadyUnavailable, lc.updateLocked(nil, nil, nil, nil) } @@ -336,7 +330,7 @@ func (lc *locationCache) getPrefAvailableEndpointsLocked(endpointsByLoc map[stri // itself advertises zero regions). The caller passes a regional // fallback whenever availWriteLocations is non-empty, so the only // time this is the customer-supplied default endpoint is the - // degenerate "zero regions" case approved for issue #25468 followup. + // degenerate "zero regions" case. endpoints = append(endpoints, fallbackEndpoint) } return endpoints @@ -368,9 +362,8 @@ func newDatabaseAccountLocationsInfo(prefLocations []string, defaultEndpoint url // successful Update() replaces them with regional endpoints. This is // safe because the pipeline policy (globalEndpointManagerPolicy) blocks // data-plane requests on a synchronous bootstrap and surfaces the GEM - // error if it fails -- so resolveServiceEndpoint is never consulted for - // a real data-plane request while these seeded values are still in - // effect. See issue #25468 followup. + // error if it fails, so resolveServiceEndpoint is never consulted for a + // real data-plane request while these seeded values are still in effect. writeEndpoints := []url.URL{defaultEndpoint} readEndpoints := []url.URL{defaultEndpoint} return &databaseAccountLocationsInfo{ diff --git a/sdk/data/azcosmos/cosmos_location_cache_test.go b/sdk/data/azcosmos/cosmos_location_cache_test.go index 0ddf9b82b95b..54f2eb43cfde 100644 --- a/sdk/data/azcosmos/cosmos_location_cache_test.go +++ b/sdk/data/azcosmos/cosmos_location_cache_test.go @@ -263,8 +263,7 @@ func TestGetPrefAvailableEndpoints(t *testing.T) { prefWriteEndpoints := lc.getPrefAvailableEndpointsLocked(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, write, lc.defaultEndpoint) // loc2: preferred + available; loc1: unavailable + preferred (moved to // tail). The trailing default-endpoint fallback was removed when we - // stopped routing data-plane traffic to the customer-supplied endpoint - // (issue #25468 followup). + // stopped routing data-plane traffic to the customer-supplied endpoint. expectedWriteEndpoints := []*url.URL{loc2Endpoint, loc1Endpoint} for i, endpoint := range expectedWriteEndpoints { @@ -284,10 +283,9 @@ func TestReadEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - // Before issue #25468 followup the code skipped loc1 in the main pref - // loop (because it equalled the write fallback), pushing it to the tail. - // Now loc1 is included in preferred order, giving a more intuitive - // result. + // Previously the code skipped loc1 in the main pref loop (because it + // equalled the write fallback), pushing it to the tail. Now loc1 is + // included in preferred order, giving a more intuitive result. expectedReadEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc4Endpoint} actualReadEndpoints, err := lc.readEndpoints() if err != nil { @@ -336,8 +334,8 @@ func TestWriteEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - // Trailing default-endpoint fallback was removed for issue #25468 - // followup: route lists must contain only regional endpoints. + // Trailing default-endpoint fallback was removed: route lists must + // contain only regional endpoints. expectedWriteEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc3Endpoint} actualWriteEndpoints, err := lc.writeEndpoints() if err != nil { From a9aee7cb6f51c046f7e64f9dd5251cd652684849 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 21 May 2026 19:25:27 -0700 Subject: [PATCH 6/8] azcosmos: deflake TestFix3c on Windows go1.26.1 Under some schedulers (observed on windows_go_1261 CI), every concurrent gem.Update can complete observing the still-fresh lastUpdateTime before any of the concurrent MarkEndpointUnavailableForWrite goroutines fires invalidate(), leaving the post-invalidation refresh un-triggered and the HTTP call count at 0 instead of the expected 1. Add an explicit gem.Update after wg.Wait() to deterministically trigger the post-invalidation refresh. The singleflight + invalidationGen bound still guarantees the burst-plus-trailing-Update collapses to exactly one HTTP round-trip, so the assertion (transport.count == 1) stays tight. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index 742b91001f4c..bca0ea605109 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -271,6 +271,14 @@ func TestFix3c_ConcurrentSameEndpointMarksAreBounded(t *testing.T) { }() } wg.Wait() + // Trigger one more Update after the burst settles. This is required for + // scheduler determinism: under some timings every concurrent Update can + // complete (observing the still-fresh lastUpdateTime) before any marker + // fires invalidate(), leaving the post-invalidation refresh un-triggered. + // This explicit post-burst Update guarantees the invalidation is acted on + // exactly once -- the singleflight then collapses any in-flight leader + // and the new caller to a single HTTP round-trip. + _ = gem.Update(context.Background(), false) // Give any spawned refresh time to drain. time.Sleep(200 * time.Millisecond) From 869d0de9114d14079e72cae094629148bbffdb26 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 21 May 2026 19:42:43 -0700 Subject: [PATCH 7/8] azcosmos: address PR Copilot review comments - getPrefAvailableEndpointsLocked now takes prefLocations as a parameter rather than reading lc.locationInfo.prefLocations. updateLocked passes nextLoc.prefLocations so the route lists are computed from a single consistent in-progress snapshot, not the still-committed lc.locationInfo. - countingTransport.Do now sets Response.Status (e.g. '200 OK') and ContentLength so simulated responses match net/http behavior and SDK helpers like NewResponseErrorWithErrorCode produce useful error text. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go | 11 +++++++---- sdk/data/azcosmos/cosmos_location_cache.go | 13 +++++++++---- sdk/data/azcosmos/cosmos_location_cache_test.go | 2 +- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go index bca0ea605109..43a84f818222 100644 --- a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -17,6 +17,7 @@ package azcosmos import ( "context" "encoding/json" + "fmt" "io" "net/http" "net/url" @@ -56,10 +57,12 @@ func (c *countingTransport) Do(req *http.Request) (*http.Response, error) { status, body = c.respFunc() } resp := &http.Response{ - StatusCode: status, - Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: jsonBody(body), - Request: req, + StatusCode: status, + Status: fmt.Sprintf("%d %s", status, http.StatusText(status)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: jsonBody(body), + ContentLength: int64(len(body)), + Request: req, } return resp, nil } diff --git a/sdk/data/azcosmos/cosmos_location_cache.go b/sdk/data/azcosmos/cosmos_location_cache.go index 2a9692f94420..083219b2d1cc 100644 --- a/sdk/data/azcosmos/cosmos_location_cache.go +++ b/sdk/data/azcosmos/cosmos_location_cache.go @@ -125,7 +125,7 @@ func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocati writeFallback = ep } } - nextLoc.writeEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, write, writeFallback) + nextLoc.writeEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, nextLoc.prefLocations, write, writeFallback) // Prefer the first available read region for the read fallback. Only // fall back to the first write endpoint (or, transitively, the default // endpoint) when the account advertises zero read regions -- accounts @@ -137,7 +137,7 @@ func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocati readFallback = ep } } - nextLoc.readEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, read, readFallback) + nextLoc.readEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, nextLoc.prefLocations, read, readFallback) lc.lastUpdateTime = time.Now() lc.locationInfo = nextLoc // TODO: log @@ -309,12 +309,17 @@ func (lc *locationCache) isEndpointUnavailableLocked(endpoint url.URL, ops reque return time.Since(info.lastCheckTime) < lc.unavailableLocationExpirationTime } -func (lc *locationCache) getPrefAvailableEndpointsLocked(endpointsByLoc map[string]url.URL, locs []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { +// getPrefAvailableEndpointsLocked returns the endpoints for the customer's +// preferred locations in priority order, with unavailable endpoints moved to +// the tail. Callers pass prefLocations explicitly so updateLocked can compute +// route lists from the in-progress nextLoc snapshot rather than the +// already-committed lc.locationInfo. +func (lc *locationCache) getPrefAvailableEndpointsLocked(endpointsByLoc map[string]url.URL, locs []string, prefLocations []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { endpoints := make([]url.URL, 0) if lc.enableCrossRegionRetries { if lc.canUseMultipleWriteLocs() || availOps&read != 0 { unavailEndpoints := make([]url.URL, 0) - for _, loc := range lc.locationInfo.prefLocations { + for _, loc := range prefLocations { if endpoint, ok := endpointsByLoc[loc]; ok { if lc.isEndpointUnavailableLocked(endpoint, availOps) { unavailEndpoints = append(unavailEndpoints, endpoint) diff --git a/sdk/data/azcosmos/cosmos_location_cache_test.go b/sdk/data/azcosmos/cosmos_location_cache_test.go index 518667e50e1f..2d11c03df20b 100644 --- a/sdk/data/azcosmos/cosmos_location_cache_test.go +++ b/sdk/data/azcosmos/cosmos_location_cache_test.go @@ -260,7 +260,7 @@ func TestGetPrefAvailableEndpoints(t *testing.T) { } // loc1: unavailable, loc2: available, loc5: non-existent lc.locationInfo.prefLocations = []string{loc1.Name, loc2.Name, "location5"} - prefWriteEndpoints := lc.getPrefAvailableEndpointsLocked(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, write, lc.defaultEndpoint) + prefWriteEndpoints := lc.getPrefAvailableEndpointsLocked(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, lc.locationInfo.prefLocations, write, lc.defaultEndpoint) // loc2: preferred + available; loc1: unavailable + preferred (moved to // tail). The trailing default-endpoint fallback was removed when we // stopped routing data-plane traffic to the customer-supplied endpoint. From 0da54189b0693baafc3cfa201b339513cd943a2f Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 21 May 2026 23:15:33 -0700 Subject: [PATCH 8/8] azcosmos: consolidate CHANGELOG entry to one line Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/data/azcosmos/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 7e3f03f50cb8..fdf1421526e4 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -8,7 +8,7 @@ ### Bugs Fixed -* Fixed excessive `GetDatabaseAccount` HTTP calls and a self-deadlock in `locationCache.readEndpoints`/`writeEndpoints` when using preferred regions. Concurrent refreshes are now coalesced via a single in-flight pattern, refresh failures honour the throttle just like successes, a chronic bootstrap failure surfaces the cached error rather than refresh-storming, and async refresh failures are logged so post-bootstrap topology drift is observable. Also stopped data-plane retries from trailing into the customer-supplied (default) endpoint once account topology is populated; route lists now contain only regional endpoints. `enableCrossRegionRetries=false` still routes single-master writes to the default endpoint, and accounts advertising zero write regions still fall back to it for writes. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815). +* Fixed excessive `GetDatabaseAccount` HTTP calls when using preferred regions, and stopped data-plane retries from trailing into the customer-supplied (default) endpoint once account topology is populated. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815). ### Other Changes