diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index fb688849002a..5f5f25f2b966 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -8,6 +8,7 @@ ### Bugs Fixed +* Fixed excessive `GetDatabaseAccount` HTTP calls when using preferred regions, and stopped data-plane retries from trailing into the customer-supplied (default) endpoint once account topology is populated. See [PR 26815](https://github.com/Azure/azure-sdk-for-go/pull/26815). * Partition key range cache now serves concurrent callers from a single in-flight refresh per container, and the cached routing map remains readable while a refresh is in progress. The refresh runs on a detached background `context.Background()` so a caller's cancellation no longer aborts the shared fetch for other waiters; each caller continues to honor its own context deadline. See [PR 26855](https://github.com/Azure/azure-sdk-for-go/pull/26855). * Partition key range cache change-feed pagination is now resilient to mid-drain throttling. 429 responses are retried indefinitely (with capped linear backoff + jitter) since the service is explicitly asking the client to slow down, and the pages already accumulated are preserved instead of restarting the drain from page 1 on the next refresh. See [PR 26855](https://github.com/Azure/azure-sdk-for-go/pull/26855). diff --git a/sdk/data/azcosmos/cosmos_client_retry_policy.go b/sdk/data/azcosmos/cosmos_client_retry_policy.go index 80784718bab3..73decce4cf24 100644 --- a/sdk/data/azcosmos/cosmos_client_retry_policy.go +++ b/sdk/data/azcosmos/cosmos_client_retry_policy.go @@ -151,7 +151,7 @@ func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, i } } - err := p.gem.Update(req.Raw().Context(), isWriteOperation) + err := p.gem.Update(req.Raw().Context(), true) if err != nil { return false, err } diff --git a/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go new file mode 100644 index 000000000000..43a84f818222 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_dbaccount_refresh_test.go @@ -0,0 +1,775 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +// Regression tests for excess GetDatabaseAccount calls observed with +// preferred regions configured. +// +// These tests cover: +// F1: failed GEM Update is throttled to refreshTimeInterval (lastAttemptTime) +// F2: concurrent Update callers are coalesced into a single HTTP call +// F3: write-retry on 403/WriteForbidden force-refreshes the GEM per attempt +// F4: a failed initial bootstrap Update is retried on the next request +// F5: locationCache.readEndpoints does not deadlock on the stale+unavailable path +// Soak: under sustained mixed load, total GEM calls respect refreshTimeInterval + +package azcosmos + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/stretchr/testify/require" +) + +// countingTransport counts how many requests reach the test endpoint and +// optionally returns a canned status / error so we can simulate +// GetDatabaseAccount behaviour. body, when set, is returned as the response +// body. +type countingTransport struct { + count atomic.Int64 + status int + body []byte + respErr error + delay time.Duration + respFunc func() (int, []byte) // when non-nil, overrides status/body per call +} + +func (c *countingTransport) Do(req *http.Request) (*http.Response, error) { + c.count.Add(1) + if c.delay > 0 { + time.Sleep(c.delay) + } + if c.respErr != nil { + return nil, c.respErr + } + status, body := c.status, c.body + if c.respFunc != nil { + status, body = c.respFunc() + } + resp := &http.Response{ + StatusCode: status, + Status: fmt.Sprintf("%d %s", status, http.StatusText(status)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: jsonBody(body), + ContentLength: int64(len(body)), + Request: req, + } + return resp, nil +} + +func jsonBody(b []byte) io.ReadCloser { + return &jsonReadCloser{b: b} +} + +type jsonReadCloser struct { + b []byte + i int +} + +func (r *jsonReadCloser) Read(p []byte) (int, error) { + if r.i >= len(r.b) { + return 0, io.EOF + } + n := copy(p, r.b[r.i:]) + r.i += n + return n, nil +} +func (r *jsonReadCloser) Close() error { return nil } + +func newGEMWithTransport(t *testing.T, preferred []string, transport policy.Transporter, refresh time.Duration) *globalEndpointManager { + t.Helper() + pl := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + gem, err := newGlobalEndpointManager("https://fake.documents.azure.com:443/", pl, preferred, refresh, true) + require.NoError(t, err) + return gem +} + +// ---------------------------------------------------------------------------- +// F1: a failed Update must throttle subsequent refresh attempts to +// refreshTimeInterval. The bug had every subsequent caller re-issuing +// GetDatabaseAccount because lastUpdateTime was only set on success. +// We still surface the cached error to callers (see F3b) but the HTTP call +// must not repeat within the throttle window. +// ---------------------------------------------------------------------------- +func TestFix1_FailedUpdateIsThrottled(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // First Update: lastAttemptTime is zero -> shouldRefresh()==true -> HTTP call -> fails. + err := gem.Update(context.Background(), false) + require.Error(t, err, "first attempt against failing endpoint must surface the error") + require.Equal(t, int64(1), transport.count.Load()) + + // Next 50 Update calls within the refresh interval return the cached + // error -- they do NOT re-issue the HTTP call. + for i := 0; i < 50; i++ { + err := gem.Update(context.Background(), false) + require.Error(t, err, "throttled Update must still surface the cached error so callers know the GEM is not populated") + } + require.Equal(t, int64(1), transport.count.Load(), + "a failed Update must be throttled exactly like a successful one") +} + +// ---------------------------------------------------------------------------- +// F2: concurrent Update callers are coalesced into a single in-flight HTTP +// call via the single-in-flight pattern in gem.Update. +// ---------------------------------------------------------------------------- +func TestFix2_ConcurrentUpdateCallersCoalesce(t *testing.T) { + // Slow transport so the first refresh is still in flight while the rest + // of the goroutines arrive. + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 100 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + const concurrency = 200 + wg := sync.WaitGroup{} + wg.Add(concurrency) + errs := make([]error, concurrency) + for i := 0; i < concurrency; i++ { + go func(idx int) { + defer wg.Done() + errs[idx] = gem.Update(context.Background(), false) + }(i) + } + wg.Wait() + for i, err := range errs { + require.NoErrorf(t, err, "goroutine %d failed", i) + } + require.Equal(t, int64(1), transport.count.Load(), + "concurrent Update callers must coalesce into a single HTTP call") +} + +// ---------------------------------------------------------------------------- +// F3: write-retry on 403/WriteForbidden force-refreshes the GEM on every +// retry attempt so the client picks up topology changes immediately. +// ---------------------------------------------------------------------------- +func TestFix3_WriteRetryForceRefreshesGEM(t *testing.T) { + defaultEndpoint, err := url.Parse("https://fake.documents.azure.com:443/") + require.NoError(t, err) + + westRegion := accountRegion{Name: "West US", Endpoint: defaultEndpoint.String()} + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{westRegion}, + WriteRegions: []accountRegion{westRegion}, + }) + transport := &countingTransport{status: http.StatusOK, body: body} + gemPipeline := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", + azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + pipeline: gemPipeline, + preferredLocations: []string{"West US"}, + locationCache: CreateMockLC(*defaultEndpoint, false), + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + retry := &clientRetryPolicy{gem: gem} + + req, err := azruntime.NewRequest(context.Background(), http.MethodPost, defaultEndpoint.String()) + require.NoError(t, err) + + const writeRetries = 5 + rc := &retryContext{} + for i := 0; i < writeRetries; i++ { + shouldRetry, err := retry.attemptRetryOnEndpointFailure(req, true, rc) + require.NoError(t, err) + require.True(t, shouldRetry) + rc.retryCount++ + } + require.Equal(t, int64(writeRetries), transport.count.Load(), + "write retries on 403/WriteForbidden must force-refresh the GEM on every attempt") +} + +// ---------------------------------------------------------------------------- +// F1b: an invalidate() that fires while a refresh is in flight must NOT be +// lost. Before the fix, the in-flight leader's "set lastUpdateTime" would +// overwrite the invalidation timestamps, causing the next caller to observe +// the refresh as completed and skip the refresh that was demanded by the +// invalidation. The leader now snapshots invalidationGen and refuses to +// commit timestamps if it changed during the flight. +// ---------------------------------------------------------------------------- +func TestFix1b_InvalidateDuringInflightRefreshIsHonored(t *testing.T) { + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 150 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // Kick off a refresh; while it is in flight, invalidate. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = gem.Update(context.Background(), false) + }() + // Wait deterministically for the leader to claim the in-flight slot. + // A naive time.Sleep is racy on loaded CI hosts; polling hasInflight() + // confirms the leader has actually entered refreshOnce. + require.Eventually(t, gem.hasInflight, time.Second, 2*time.Millisecond, + "leader must claim the in-flight slot within 1s") + gem.invalidate() + wg.Wait() + + // First refresh returned (count=1) but invalidation should have + // prevented timestamps from being committed. A subsequent Update must + // therefore issue a fresh HTTP call (count=2). + require.Equal(t, int64(1), transport.count.Load()) + require.NoError(t, gem.Update(context.Background(), false)) + require.Equal(t, int64(2), transport.count.Load(), + "invalidation during an in-flight refresh must force the next Update to actually refresh") +} + +// ---------------------------------------------------------------------------- +// F3c: concurrent MarkEndpointUnavailable* calls for the same endpoint may +// each observe wasUnavailable==false (the check is not atomic with the +// mark). Each one may call invalidate(). The single-in-flight pattern in +// Update bounds the resulting HTTP calls so the user-visible blast radius +// is at most one extra refresh per concurrent burst -- not one per marker. +// This test documents and bounds that behaviour. +// ---------------------------------------------------------------------------- +func TestFix3c_ConcurrentSameEndpointMarksAreBounded(t *testing.T) { + defaultEndpoint, _ := url.Parse("https://fake.documents.azure.com:443/") + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: defaultEndpoint.String()}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: defaultEndpoint.String()}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 50 * time.Millisecond} + gemPipeline := azruntime.NewPipeline("azcosmosgemtest", "v1.0.0", + azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: transport}) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + pipeline: gemPipeline, + preferredLocations: []string{"West US"}, + locationCache: CreateMockLC(*defaultEndpoint, false), + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + + const concurrency = 50 + wg := sync.WaitGroup{} + wg.Add(concurrency * 2) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + _ = gem.MarkEndpointUnavailableForWrite(*defaultEndpoint) + }() + go func() { + defer wg.Done() + _ = gem.Update(context.Background(), false) + }() + } + wg.Wait() + // Trigger one more Update after the burst settles. This is required for + // scheduler determinism: under some timings every concurrent Update can + // complete (observing the still-fresh lastUpdateTime) before any marker + // fires invalidate(), leaving the post-invalidation refresh un-triggered. + // This explicit post-burst Update guarantees the invalidation is acted on + // exactly once -- the singleflight then collapses any in-flight leader + // and the new caller to a single HTTP round-trip. + _ = gem.Update(context.Background(), false) + // Give any spawned refresh time to drain. + time.Sleep(200 * time.Millisecond) + + // With the atomic check-and-mark in markEndpointUnavailable, only the + // FIRST goroutine to win the mapMutex Lock observes wasAlreadyUnavailable + // == false and triggers invalidate(). All other markers see true and + // skip the invalidation. Combined with the in-flight singleflight in + // gem.Update, the upper bound is provably 1 HTTP call -- the leader's + // refresh handles the single invalidation, and waiters share its result. + // If invalidate were to land AFTER the leader started its flight + // (impossible here because the test fixture starts with a non-stale + // lastUpdateTime and the marker is the trigger for the first refresh), + // invalidationGen would advance mid-flight and a second leader would + // fire -- so 2 is the theoretical maximum across all timings. We + // assert the tight bound to guard against regressions in atomicity. + calls := transport.count.Load() + require.Equal(t, int64(1), calls, + "concurrent same-endpoint marks must collapse to exactly 1 GEM call (got %d for concurrency=%d)", calls, concurrency) +} + +// to every request while the GEM has never been successfully populated, not +// just the very first request. The cached lastUpdateErr is returned from +// throttled Update calls when !populated. +// ---------------------------------------------------------------------------- +func TestFix3b_BootstrapFailureIsSurfacedOnEveryRequestUntilThrottleExpires(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + for i := 0; i < 5; i++ { + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r) + require.Error(t, err, "request %d must surface the cached bootstrap error", i) + } + require.Equal(t, int64(1), transport.count.Load(), + "only one actual HTTP call must be made; the rest return the cached error") +} + +// resettableOnce only latches on success, so the next caller retries the +// bootstrap. Combined with F1, the retries are throttled to one per +// refreshTimeInterval -- they don't fan out per request. +// ---------------------------------------------------------------------------- +func TestFix4_InitialBootstrapFailureIsRetriedAndThrottled(t *testing.T) { + transport := &countingTransport{status: http.StatusBadRequest, delay: 5 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + // First request: synchronous bootstrap fires the GEM HTTP call, which + // fails. The error must surface to the caller. + r1, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r1.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r1) + require.Error(t, err, "the failed bootstrap error must surface to the caller") + + first := transport.count.Load() + require.Equal(t, int64(1), first) + + // Next 20 sequential requests must NOT each issue a fresh + // GetDatabaseAccount call. Bootstrap is retried on the next request but + // throttled by lastAttemptTime; subsequent requests inside the throttle + // window see ShouldRefresh()==false and skip the async refresh too. + // Capture the error from each follow-up to confirm the bootstrap path + // is actually entered (returning the cached error) -- this prevents a + // regression where populated() accidentally returned true and the + // bootstrap was silently skipped. + const followUp = 20 + for i := 0; i < followUp; i++ { + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r) + require.Error(t, err, + "follow-up request %d must surface the cached bootstrap error -- if it doesn't, populated() is incorrectly returning true and we're silently routing to an unpopulated GEM", i) + } + // Drain any goroutines that the policy may have spawned. + time.Sleep(200 * time.Millisecond) + + total := transport.count.Load() + require.Equal(t, int64(1), total, + "failed bootstrap must be throttled, not retried on every request (got %d HTTP calls)", total) +} + +// ---------------------------------------------------------------------------- +// F5: locationCache.readEndpoints / writeEndpoints no longer self-deadlock on +// the stale+unavailable refresh path. Before the fix, this hung forever +// because RLock could not be upgraded to Lock inside refreshStaleEndpoints. +// ---------------------------------------------------------------------------- +func TestFix5_ReadEndpointsDoesNotDeadlock(t *testing.T) { + defaultEndpoint, _ := url.Parse("https://fake.documents.azure.com:443/") + lc := CreateMockLC(*defaultEndpoint, false) + lc.locationUnavailabilityInfoMap[*defaultEndpoint] = locationUnavailabilityInfo{ + lastCheckTime: time.Now(), + unavailableOps: read, + } + lc.lastUpdateTime = time.Now().Add(-10 * time.Minute) + + done := make(chan error, 1) + go func() { + _, err := lc.readEndpoints() + done <- err + }() + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("readEndpoints deadlocked on the stale+unavailable path -- F5 regression") + } + + done2 := make(chan error, 1) + go func() { + _, err := lc.writeEndpoints() + done2 <- err + }() + select { + case err := <-done2: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("writeEndpoints deadlocked on the stale+unavailable path -- F5 regression") + } +} + +// ---------------------------------------------------------------------------- +// Soak test: a high-concurrency burst against a healthy GEM with the default +// 5-min refresh interval should issue exactly one GetDatabaseAccount call. +// ---------------------------------------------------------------------------- +func TestRegression_HealthyHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + if testing.Short() { + t.Skip("500-goroutine soak; skipped under -short") + } + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 20 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + const concurrency = 500 + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, _ = pl.Do(r) + }() + } + wg.Wait() + time.Sleep(300 * time.Millisecond) + + calls := transport.count.Load() + require.Equal(t, int64(1), calls, + "500 concurrent requests on a healthy client must produce exactly one GetDatabaseAccount call (got %d)", calls) +} + +// Soak test variant: the same burst against a FAILING GEM must also produce +// exactly one HTTP call, demonstrating that F1 + F2 together close the +// failure-storm path. +func TestRegression_FailingGEMHighConcurrencyStaysAtOneGEMCall(t *testing.T) { + if testing.Short() { + t.Skip("500-goroutine soak; skipped under -short") + } + transport := &countingTransport{status: http.StatusBadRequest, delay: 20 * time.Millisecond} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + const concurrency = 500 + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + r, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, _ = pl.Do(r) + }() + } + wg.Wait() + time.Sleep(300 * time.Millisecond) + + calls := transport.count.Load() + require.Equal(t, int64(1), calls, + "500 concurrent requests against a failing GEM must still produce only one GetDatabaseAccount call (got %d)", calls) +} + +// ---------------------------------------------------------------------------- +// Default-endpoint elimination. +// After the GEM is populated, data-plane requests must never resolve to the +// customer-supplied endpoint -- with the single exception of the degenerate +// "zero write regions" case for write requests. +// ---------------------------------------------------------------------------- + +// makeGEMWithRegions builds a populated GEM whose default endpoint host +// differs from every account-region host, so a routing fallback to the +// default endpoint is observable. +func makeGEMWithRegions(t *testing.T, isMultiMaster bool, preferred []string, enableCrossRegion bool) *globalEndpointManager { + t.Helper() + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: enableCrossRegion, + enableMultipleWriteLocations: isMultiMaster, + } + writeRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + } + if isMultiMaster { + writeRegions = append(writeRegions, accountRegion{Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}) + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + {Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, + {Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}, + } + require.NoError(t, lc.update(writeRegions, readRegions, preferred, &isMultiMaster)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), + preferredLocations: preferred, + locationCache: lc, + refreshTimeInterval: defaultExpirationTime, + lastUpdateTime: time.Now(), + } + return gem +} + +func TestDefaultEndpointElim_DataPlaneNeverHitsDefaultWhenPopulated(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + + cases := []struct { + name string + multi bool + preferred []string + }{ + {"singleMaster_withPreferred", false, []string{"West US", "East US"}}, + {"singleMaster_noPreferred", false, []string{}}, + {"multiMaster_withPreferred", true, []string{"Central US", "East US"}}, + {"multiMaster_noPreferred", true, []string{}}, + } + resourceTypes := []resourceType{resourceTypeDocument, resourceTypeCollection, resourceTypeDatabase} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gem := makeGEMWithRegions(t, tc.multi, tc.preferred, true) + for _, rt := range resourceTypes { + for _, isWrite := range []bool{false, true} { + for _, useWrite := range []bool{false, true} { + for idx := 0; idx < 25; idx++ { + ep := gem.ResolveServiceEndpoint(idx, rt, isWrite, useWrite) + require.NotEqual(t, defaultHost, ep.Host, + "resourceType=%v isWrite=%v useWrite=%v idx=%d resolved to default endpoint", rt, isWrite, useWrite, idx) + } + } + } + } + }) + } +} + +func TestDefaultEndpointElim_SessionRetryOnSingleMasterRoutesToRegionalWrite(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + gem := makeGEMWithRegions(t, false /*single master*/, []string{"West US"}, true) + + // Simulate the session-not-available retry: useWriteEndpoint=true on a + // single-master read. The resolved endpoint must be the regional write + // region, not the customer endpoint. + ep := gem.ResolveServiceEndpoint(1, resourceTypeDocument, false /*isWrite*/, true /*useWrite*/) + require.NotEqual(t, defaultHost, ep.Host) + require.Contains(t, ep.Host, "east-us") +} + +func TestDefaultEndpointElim_UnmatchedPreferredRegionRoutesToRegional(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + // Customer set preferred regions that the account does NOT advertise. + gem := makeGEMWithRegions(t, false, []string{"South Africa North", "France Central"}, true) + + for idx := 0; idx < 10; idx++ { + readEP := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false, false) + require.NotEqual(t, defaultHost, readEP.Host, + "read with unmatched preferred region resolved to default endpoint at idx=%d", idx) + writeEP := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, true, false) + require.NotEqual(t, defaultHost, writeEP.Host, + "write with unmatched preferred region resolved to default endpoint at idx=%d", idx) + } +} + +func TestDefaultEndpointElim_ZeroWriteRegionsRetainsDefaultFallback(t *testing.T) { + // Degenerate account metadata: no write regions advertised. Per the + // project decision, this is the ONLY remaining data-plane code path + // where a write request routes to the customer-supplied endpoint. + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: true, + enableMultipleWriteLocations: false, + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + } + noWrites := []accountRegion{} // explicit zero + multi := false + require.NoError(t, lc.update(noWrites, readRegions, []string{}, &multi)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), locationCache: lc, + refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), + } + + ep := gem.ResolveServiceEndpoint(0, resourceTypeDocument, true /*isWrite*/, false) + require.Equal(t, defaultEndpoint.Host, ep.Host, + "zero-write-regions write must still route to the customer endpoint (documented degenerate case)") +} + +func TestDefaultEndpointElim_CrossRegionRetriesDisabledUsesDefaultEndpoint(t *testing.T) { + defaultHost := "customer-endpoint.documents.azure.com:443" + gem := makeGEMWithRegions(t, false, []string{"East US"}, false /*enableCrossRegion=false*/) + + // With cross-region retries disabled, single-master writes route to the + // default (customer-supplied) endpoint -- preserving the long-standing + // behavior of this flag. + for idx := 0; idx < 10; idx++ { + ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, true, false) + require.Equal(t, defaultHost, ep.Host, + "cross-region-retries=false must route writes to the default endpoint at idx=%d", idx) + } +} + +// TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion verifies the +// corollary of TestDefaultEndpointElim_ZeroWriteRegionsRetainsDefaultFallback: +// when the account advertises zero write regions, READS must still resolve to +// an advertised read region rather than the customer endpoint. Only the +// write request in this scenario is allowed to fall through to default. +func TestDefaultEndpointElim_ZeroWriteRegionsReadGoesToReadRegion(t *testing.T) { + defaultEndpoint, _ := url.Parse("https://customer-endpoint.documents.azure.com:443/") + lc := &locationCache{ + defaultEndpoint: *defaultEndpoint, + locationUnavailabilityInfoMap: map[url.URL]locationUnavailabilityInfo{}, + unavailableLocationExpirationTime: defaultExpirationTime, + enableCrossRegionRetries: true, + enableMultipleWriteLocations: false, + } + readRegions := []accountRegion{ + {Name: "East US", Endpoint: "https://east-us.documents.azure.com:443/"}, + {Name: "Central US", Endpoint: "https://central-us.documents.azure.com:443/"}, + } + multi := false + require.NoError(t, lc.update([]accountRegion{}, readRegions, []string{}, &multi)) + gem := &globalEndpointManager{ + clientEndpoint: defaultEndpoint.String(), locationCache: lc, + refreshTimeInterval: defaultExpirationTime, lastUpdateTime: time.Now(), + } + + for idx := 0; idx < 10; idx++ { + ep := gem.ResolveServiceEndpoint(idx, resourceTypeDocument, false /*isWrite*/, false) + require.NotEqual(t, defaultEndpoint.Host, ep.Host, + "reads must route to a read region even when there are zero write regions; got default at idx=%d", idx) + } +} + +// ---------------------------------------------------------------------------- +// F7: a transient post-invalidation refresh failure must NOT stall the data +// plane for the full refreshTimeInterval. Once the GEM has ever been +// populated, requests should keep routing through the cached topology even +// while a refresh attempt is throttled after a failure. This guards the +// regression flagged by the deep reviewer where invalidate() zeroed +// lastUpdateTime, which in turn made populated() return false and caused +// the policy's bootstrap path to surface the cached error on every request. +// ---------------------------------------------------------------------------- +func TestFix7_InvalidateThenRefreshFailureDoesNotStallDataPlane(t *testing.T) { + // Programmable transport: succeed once (bootstrap), then fail. + responses := []int{http.StatusOK, http.StatusBadRequest, http.StatusBadRequest} + idx := atomic.Int32{} + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://west-us.documents.azure.com:443/"}}, + }) + transport := &countingTransport{} + transport.respFunc = func() (int, []byte) { + i := idx.Add(1) - 1 + if int(i) < len(responses) { + status := responses[i] + if status == http.StatusOK { + return status, body + } + return status, nil + } + return http.StatusBadRequest, nil + } + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + pol := &globalEndpointManagerPolicy{gem: gem} + + downstream := &countingTransport{status: http.StatusOK} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: []policy.Policy{pol}}, + &policy.ClientOptions{Transport: downstream}) + + // 1. Bootstrap succeeds. + r1, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r1.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err := pl.Do(r1) + require.NoError(t, err, "bootstrap must succeed") + require.True(t, gem.populated(), "GEM must be populated after a successful bootstrap") + + // 2. Simulate a regional 403 -> MarkEndpointUnavailableForWrite -> invalidate. + west, _ := url.Parse("https://west-us.documents.azure.com:443/") + require.NoError(t, gem.MarkEndpointUnavailableForWrite(*west)) + + // 3. populated() must remain true even though lastUpdateTime is zero. + require.True(t, gem.populated(), + "populated() must remain true after invalidate() -- otherwise a transient refresh failure stalls the data plane") + + // 4. A subsequent data-plane request must succeed (routing through the cached + // topology) even if the post-invalidation refresh attempt fails. + r2, _ := azruntime.NewRequest(context.Background(), http.MethodGet, "https://fake.documents.azure.com/") + r2.SetOperationValue(pipelineRequestOptions{resourceType: resourceTypeDocument}) + _, err = pl.Do(r2) + require.NoError(t, err, + "data-plane request must succeed via cached topology even when post-invalidate refresh fails") +} + +// ---------------------------------------------------------------------------- +// F8: a waiter on an in-flight refresh must respect its own ctx deadline, +// not block for the leader's full HTTP round-trip duration. This matters +// because clientRetryPolicy's GEM-refresh calls pass req.Raw().Context() +// (a cancellable user context), so a caller-side timeout must take effect +// promptly even when the user happens to be a waiter rather than the leader. +// ---------------------------------------------------------------------------- +func TestFix8_UpdateWaiterRespectsContextCancellation(t *testing.T) { + if testing.Short() { + t.Skip("blocks on a 2s leader by design; skipped under -short") + } + // Slow leader: holds the in-flight slot for 2 seconds. + body, _ := json.Marshal(accountProperties{ + ReadRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + WriteRegions: []accountRegion{{Name: "West US", Endpoint: "https://fake.documents.azure.com:443/"}}, + }) + transport := &countingTransport{status: http.StatusOK, body: body, delay: 2 * time.Second} + gem := newGEMWithTransport(t, []string{"West US"}, transport, 5*time.Minute) + + // Leader starts a refresh in the background. + leaderDone := make(chan struct{}) + go func() { + defer close(leaderDone) + _ = gem.Update(context.Background(), false) + }() + + // Wait until the leader is actually in-flight via the hasInflight test + // helper rather than peeking at internal mutex state -- keeps the test + // resilient to changes in the GEM's internal synchronization primitives. + require.Eventually(t, gem.hasInflight, 500*time.Millisecond, 5*time.Millisecond, + "leader must claim the in-flight slot within 500ms") + + // Waiter has a 100 ms deadline. It must return promptly with the + // deadline error rather than blocking for the leader's 2-second HTTP + // call to complete. + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + start := time.Now() + err := gem.Update(ctx, false) + waited := time.Since(start) + + require.ErrorIs(t, err, context.DeadlineExceeded, "waiter must surface ctx error") + require.Less(t, waited, 500*time.Millisecond, + "waiter must respect ctx deadline -- waited %v but ctx deadline was 100ms", waited) + + // Clean up: let the leader finish so the test doesn't leak the goroutine. + <-leaderDone +} diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go index fa7b50392c36..cbb7798b4e82 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" @@ -24,8 +25,42 @@ type globalEndpointManager struct { preferredLocations []string locationCache *locationCache refreshTimeInterval time.Duration - gemMutex sync.RWMutex + gemMutex sync.Mutex lastUpdateTime time.Time + // lastAttemptTime records the most recent Update attempt regardless of + // outcome. shouldRefresh() honours it so a failed GetAccountProperties is + // throttled to refreshTimeInterval just like a successful one, preventing + // a failure-loop where every caller re-issues the HTTP call immediately. + lastAttemptTime time.Time + // lastUpdateErr is the error from the most recent refresh attempt. It is + // returned to callers that hit the throttle window before the GEM has + // ever been successfully populated, so a chronic bootstrap failure is + // surfaced on every request rather than silently swallowed. + lastUpdateErr error + // everPopulated is set true on the first successful refresh and never + // reset, even when invalidate() zeroes lastUpdateTime. populated() reads + // this flag rather than the timestamp, so a transient post-invalidation + // refresh failure does not stall the data plane: requests can still route + // through the existing locationCache topology while the next refresh + // attempt waits for the throttle window. Stored as atomic.Bool so the + // hot-path policy check is lock-free. + everPopulated atomic.Bool + // inflight coalesces concurrent Update callers: only the first does the + // HTTP call; the rest wait on the per-flight done channel and read + // per-flight err. Each refresh has its own *updateFlight so late waiters + // cannot accidentally read a subsequent flight's error. + inflight *updateFlight + // invalidationGen is bumped by invalidate(). The Update leader snapshots + // it before the HTTP call and, if it changed by completion, declines to + // advance lastUpdateTime/lastAttemptTime -- so an invalidation that + // happens during an in-flight refresh is not lost. + invalidationGen uint64 +} + +// updateFlight tracks a single in-flight refresh. +type updateFlight struct { + done chan struct{} + err error } func newGlobalEndpointManager(clientEndpoint string, pipeline azruntime.Pipeline, preferredLocations []string, refreshTimeInterval time.Duration, enableCrossRegionRetries bool) (*globalEndpointManager, error) { @@ -59,11 +94,49 @@ func (gem *globalEndpointManager) GetReadEndpoints() ([]url.URL, error) { } func (gem *globalEndpointManager) MarkEndpointUnavailableForWrite(endpoint url.URL) error { - return gem.locationCache.markEndpointUnavailableForWrite(endpoint) + // markEndpointUnavailableForWrite atomically reports whether the + // endpoint was already unavailable for write in the same critical + // section that performs the mark. This eliminates the check-then-act + // race that would otherwise let concurrent markers all observe + // "wasn't unavailable" and each invalidate the GEM. A new event + // invalidates the GEM cache so the next Update(false) actually fires + // -- the first 403/WriteForbidden or network error for an endpoint + // may indicate a failover and we want to learn about new write + // regions promptly. Subsequent retries within the unavailability + // window do not invalidate. + wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForWrite(endpoint) + if err != nil { + return err + } + if !wasAlreadyUnavailable { + gem.invalidate() + } + return nil } func (gem *globalEndpointManager) MarkEndpointUnavailableForRead(endpoint url.URL) error { - return gem.locationCache.markEndpointUnavailableForRead(endpoint) + wasAlreadyUnavailable, err := gem.locationCache.markEndpointUnavailableForRead(endpoint) + if err != nil { + return err + } + if !wasAlreadyUnavailable { + gem.invalidate() + } + return nil +} + +// invalidate forces the next non-force Update to actually issue a refresh by +// clearing both lastUpdateTime and lastAttemptTime, and bumps +// invalidationGen so that a refresh currently in flight cannot mask the +// invalidation by writing the post-call timestamps. Used when we learn about +// a newly-unavailable endpoint and want to discover potential failover +// targets without waiting for the next refresh interval. +func (gem *globalEndpointManager) invalidate() { + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() + gem.lastUpdateTime = time.Time{} + gem.lastAttemptTime = time.Time{} + gem.invalidationGen++ } func (gem *globalEndpointManager) GetEndpointLocation(endpoint url.URL) string { @@ -71,7 +144,7 @@ func (gem *globalEndpointManager) GetEndpointLocation(endpoint url.URL) string { } func (gem *globalEndpointManager) CanUseMultipleWriteLocations() bool { - return gem.locationCache.canUseMultipleWriteLocs() + return gem.locationCache.CanUseMultipleWriteLocs() } func (gem *globalEndpointManager) IsEndpointUnavailable(endpoint url.URL, ops requestedOperations) bool { @@ -82,39 +155,159 @@ func (gem *globalEndpointManager) RefreshStaleEndpoints() { gem.locationCache.refreshStaleEndpoints() } +// populated reports whether the GEM has ever been successfully populated. +// Unlike !lastUpdateTime.IsZero(), this remains true after invalidate() +// zeroes the timestamps, so a transient post-invalidation refresh failure +// does not stall the data plane. Lock-free atomic read keeps the policy +// hot path off gemMutex. +func (gem *globalEndpointManager) populated() bool { + return gem.everPopulated.Load() +} + +// hasInflight is a test-only accessor for the in-flight refresh slot. +// Tests use it to wait until a leader has claimed the slot before firing +// follow-up calls; keeping the field access inside the GEM means tests +// don't need to grab gemMutex directly. +func (gem *globalEndpointManager) hasInflight() bool { + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() + return gem.inflight != nil +} + func (gem *globalEndpointManager) ShouldRefresh() bool { - gem.gemMutex.RLock() - defer gem.gemMutex.RUnlock() + gem.gemMutex.Lock() + defer gem.gemMutex.Unlock() return gem.shouldRefresh() } func (gem *globalEndpointManager) shouldRefresh() bool { - return time.Since(gem.lastUpdateTime) > gem.refreshTimeInterval + // Honor whichever happened more recently: a successful update or an + // attempt that failed. Failures must be throttled too, otherwise a + // failing endpoint causes every caller to re-issue GetDatabaseAccount + // immediately. + last := gem.lastUpdateTime + if gem.lastAttemptTime.After(last) { + last = gem.lastAttemptTime + } + return time.Since(last) > gem.refreshTimeInterval } func (gem *globalEndpointManager) ResolveServiceEndpoint(locationIndex int, resourceType resourceType, isWriteOperation, useWriteEndpoint bool) url.URL { return gem.locationCache.resolveServiceEndpoint(locationIndex, resourceType, isWriteOperation, useWriteEndpoint) } +// Update refreshes the GEM cache by calling GetAccountProperties when needed. +// Concurrent callers are coalesced via a single-in-flight pattern so at most +// one HTTP call is in flight per client at any time. Both successes and +// failures advance lastAttemptTime so the next refresh is throttled to +// refreshTimeInterval, preventing a failure-storm. +// +// If the GEM has never been successfully populated and the throttle is +// active, Update returns the cached error from the most recent failed +// attempt so a chronic bootstrap failure is surfaced on every request +// rather than silently swallowed. Once the GEM HAS been populated, a +// throttled Update returns nil even after a subsequent refresh failure -- +// the data plane keeps using the cached topology until the throttle expires +// and a fresh refresh can run. +// +// Update respects ctx cancellation for waiters. The leader's HTTP call runs +// under context.WithoutCancel(ctx) so an unrelated caller-side cancellation +// does not poison the shared flight result for other coalesced waiters +// (GetAccountProperties applies its own 60s timeout). Waiters select between +// flight completion and their own ctx.Done() so a caller-side timeout cannot +// be exceeded by an unrelated stuck refresh. func (gem *globalEndpointManager) Update(ctx context.Context, forceRefresh bool) error { gem.gemMutex.Lock() - defer gem.gemMutex.Unlock() if !gem.shouldRefresh() && !forceRefresh { - return nil + // Throttled. Surface the cached error only if we have NEVER + // successfully populated the GEM -- otherwise the data plane has + // a valid cached topology and should continue working until the + // next refresh attempt succeeds. The cached error is shared across + // force=true and force=false callers: both want to surface + // "bootstrap is broken" and there's no caller-visible distinction. + var cached error + if !gem.everPopulated.Load() { + cached = gem.lastUpdateErr + } + gem.gemMutex.Unlock() + return cached } + if gem.inflight != nil { + // Another goroutine is performing a refresh. Wait for it and share + // its result rather than spawning a duplicate HTTP call. The result + // lives on the per-flight struct so subsequent flights cannot + // overwrite it. Honour the waiter's ctx so a caller-side timeout + // is not extended by the leader's HTTP call duration. + flight := gem.inflight + gem.gemMutex.Unlock() + select { + case <-flight.done: + return flight.err + case <-ctx.Done(): + return ctx.Err() + } + } + // We are the leader. Publish the inflight flight and snapshot the + // invalidation generation, then release the lock while we perform the + // HTTP call so ShouldRefresh and other non-Update paths don't block on + // a network round-trip. + flight := &updateFlight{done: make(chan struct{})} + gem.inflight = flight + genAtStart := gem.invalidationGen + gem.gemMutex.Unlock() + + // Panic-safe cleanup: if refreshOnce (or anything it transitively calls + // -- the pipeline, JSON unmarshal, locationCache.update) panics, we + // MUST still clear gem.inflight and close flight.done, otherwise every + // subsequent Update caller blocks forever on <-flight.done. We capture + // any panic, record it as the flight error, and re-panic after cleanup. + var err error + defer func() { + r := recover() + gem.gemMutex.Lock() + if r != nil && err == nil { + err = fmt.Errorf("panic in GEM refresh: %v", r) + } + flight.err = err + gem.lastUpdateErr = err + if gem.invalidationGen == genAtStart { + // No invalidation occurred during the flight, so commit the + // timestamps and let the throttle take effect. + gem.lastAttemptTime = time.Now() + if err == nil { + gem.lastUpdateTime = gem.lastAttemptTime + gem.everPopulated.Store(true) + } + } + // If invalidationGen changed, leave the timestamps untouched so + // the next caller observes shouldRefresh()==true and performs a + // fresh refresh that reflects the post-invalidation state. + gem.inflight = nil + gem.gemMutex.Unlock() + close(flight.done) + if r != nil { + panic(r) + } + }() + err = gem.refreshOnce(context.WithoutCancel(ctx)) + return err +} + +// refreshOnce performs the actual GetAccountProperties HTTP call and +// propagates the result into the location cache. It must not hold gemMutex. +func (gem *globalEndpointManager) refreshOnce(ctx context.Context) error { accountProperties, err := gem.GetAccountProperties(ctx) if err != nil { - return fmt.Errorf("failed to retrieve account properties: %v", err) + return fmt.Errorf("failed to retrieve account properties: %w", err) } - err = gem.locationCache.update( + if err := gem.locationCache.update( accountProperties.WriteRegions, accountProperties.ReadRegions, gem.preferredLocations, - &accountProperties.EnableMultipleWriteLocations) - if err != nil { - return fmt.Errorf("failed to update location cache: %v", err) + &accountProperties.EnableMultipleWriteLocations, + ); err != nil { + return fmt.Errorf("failed to update location cache: %w", err) } - gem.lastUpdateTime = time.Now() return nil } diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go index f533330e9725..bb0d1a3d8994 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go @@ -6,28 +6,69 @@ package azcosmos import ( "context" "net/http" - "sync" + "runtime/debug" + "sync/atomic" + azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" ) type globalEndpointManagerPolicy struct { - gem *globalEndpointManager - once sync.Once + gem *globalEndpointManager + // asyncRefreshPending gates the spawn of the async refresh goroutine. + // Without this gate every request that observes ShouldRefresh()==true + // (potentially thousands during a burst that arrives right as the + // throttle expires) would spawn its own goroutine, each of which would + // then queue as a waiter inside gem.Update. The singleflight in Update + // collapses them to one HTTP call, but the goroutine + select overhead + // is wasted. CAS this to true before spawning; the goroutine clears it + // on exit. + asyncRefreshPending atomic.Bool } func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, error) { + // Synchronous bootstrap: while the GEM has never been successfully + // populated, every request synchronously calls Update. Concurrent + // callers are coalesced inside gem.Update via the single-in-flight + // pattern, so at most one HTTP call is in flight. If the call fails + // the throttle in gem.Update (lastAttemptTime) ensures subsequent + // bootstrap retries respect refreshTimeInterval, preventing a + // failure storm. var err error - p.once.Do(func() { + if !p.gem.populated() { // Use the same context, but without the cancellation signal. - // We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request. - err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), true) - }) - if p.gem.ShouldRefresh() { + // We DO want to preserve things like context values, but the GEM + // update needs to complete fully, even if the user cancels the + // triggering request. + err = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) + } + if p.gem.ShouldRefresh() && p.asyncRefreshPending.CompareAndSwap(false, true) { + // Capture the context before launching the goroutine so we do not + // depend on req's lifetime after the policy returns. + refreshCtx := context.WithoutCancel(req.Raw().Context()) go func() { - // Use the same context, but without the cancellation signal. - // We DO want to preserve things like context values, but the GEM update needs to complete fully, even if the user cancels the triggering request. - _ = p.gem.Update(context.WithoutCancel(req.Raw().Context()), false) + defer p.asyncRefreshPending.Store(false) + // gem.Update's panic-safe defer re-panics after cleanup. We + // recover here so a panic in the GEM pipeline does not bring + // down the host process via this detached goroutine. The + // recovered value is logged (rather than silently dropped) so + // production crashes remain triageable. + defer func() { + if r := recover(); r != nil { + log.Writef(azlog.EventResponse, + "panic in azcosmos GEM async refresh: %v\n%s", + r, debug.Stack()) + } + }() + // Log refresh failures so a chronically failing GEM is + // observable. Without this, post-bootstrap topology drift is + // silent: the data plane keeps routing to the cached topology + // and callers see no signal until requests start failing. + if err := p.gem.Update(refreshCtx, false); err != nil { + log.Writef(azlog.EventResponse, + "azcosmos GEM async refresh failed: %v", err) + } }() } if p.gem.CanUseMultipleWriteLocations() { diff --git a/sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go b/sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go index 33be2f610eae..b6c997970d80 100644 --- a/sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go +++ b/sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go @@ -18,6 +18,7 @@ import ( azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/internal/mock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type countPolicy struct { @@ -496,6 +497,7 @@ func TestAddedAllowTentativeHeaderGEMPolicy(t *testing.T) { gemServer.SetResponse(mock.WithBody([]byte(mocked_response))) // change time to trigger another get account properties call mockGem.lastUpdateTime = time.Now().Add(-10 * time.Minute) + mockGem.lastAttemptTime = time.Now().Add(-10 * time.Minute) // Issue another test request req, err = azruntime.NewRequest(ctx, http.MethodGet, gemServer.URL()) @@ -509,6 +511,12 @@ func TestAddedAllowTentativeHeaderGEMPolicy(t *testing.T) { t.Fatalf("testPipeline.Do failed: %v", err) } + // Wait for the background async refresh to complete so the locationCache + // reflects the new (non-multi-write) topology before the next request. + require.Eventually(t, func() bool { + return !mockGem.CanUseMultipleWriteLocations() + }, 2*time.Second, 5*time.Millisecond, "async GEM refresh must update locationCache within 2s") + // Issue another test request that will use the updated account properties req, err = azruntime.NewRequest(ctx, http.MethodGet, gemServer.URL()) if err != nil { diff --git a/sdk/data/azcosmos/cosmos_location_cache.go b/sdk/data/azcosmos/cosmos_location_cache.go index 51f07e51a280..083219b2d1cc 100644 --- a/sdk/data/azcosmos/cosmos_location_cache.go +++ b/sdk/data/azcosmos/cosmos_location_cache.go @@ -78,7 +78,17 @@ func newLocationCache(prefLocations []string, defaultEndpoint url.URL, enableCro } } +// update refreshes the location cache. It acquires mapMutex internally; do +// not call it while holding mapMutex (use updateLocked from inside such +// sections). Public callers go through update; internal callers that already +// hold the write lock call updateLocked. func (lc *locationCache) update(writeLocations []accountRegion, readLocations []accountRegion, prefList []string, enableMultipleWriteLocations *bool) error { + lc.mapMutex.Lock() + defer lc.mapMutex.Unlock() + return lc.updateLocked(writeLocations, readLocations, prefList, enableMultipleWriteLocations) +} + +func (lc *locationCache) updateLocked(writeLocations []accountRegion, readLocations []accountRegion, prefList []string, enableMultipleWriteLocations *bool) error { nextLoc := copyDatabaseAccountLocationsInfo(lc.locationInfo) if prefList != nil { nextLoc.prefLocations = prefList @@ -86,7 +96,7 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations [] if enableMultipleWriteLocations != nil { lc.enableMultipleWriteLocations = *enableMultipleWriteLocations } - lc.refreshStaleEndpoints() + lc.refreshStaleEndpointsLocked() if readLocations != nil { availReadEndpointsByLocation, availReadLocations, err := getEndpointsByLocation(readLocations) if err != nil { @@ -105,8 +115,29 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations [] nextLoc.availWriteLocations = availWriteLocations } - nextLoc.writeEndpoints = lc.getPrefAvailableEndpoints(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, write, lc.defaultEndpoint) - nextLoc.readEndpoints = lc.getPrefAvailableEndpoints(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, read, nextLoc.writeEndpoints[0]) + // Choose regional fallbacks so the route lists never trail into the + // customer-supplied default endpoint. The only data-plane code path + // that may still hit the default endpoint is the degenerate "zero + // write regions on a write" case. + writeFallback := lc.defaultEndpoint + if len(nextLoc.availWriteLocations) > 0 { + if ep, ok := nextLoc.availWriteEndpointsByLocation[nextLoc.availWriteLocations[0]]; ok { + writeFallback = ep + } + } + nextLoc.writeEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availWriteEndpointsByLocation, nextLoc.availWriteLocations, nextLoc.prefLocations, write, writeFallback) + // Prefer the first available read region for the read fallback. Only + // fall back to the first write endpoint (or, transitively, the default + // endpoint) when the account advertises zero read regions -- accounts + // with valid read regions must never resolve a read to the default + // endpoint. + readFallback := nextLoc.writeEndpoints[0] + if len(nextLoc.availReadLocations) > 0 { + if ep, ok := nextLoc.availReadEndpointsByLocation[nextLoc.availReadLocations[0]]; ok { + readFallback = ep + } + } + nextLoc.readEndpoints = lc.getPrefAvailableEndpointsLocked(nextLoc.availReadEndpointsByLocation, nextLoc.availReadLocations, nextLoc.prefLocations, read, readFallback) lc.lastUpdateTime = time.Now() lc.locationInfo = nextLoc // TODO: log @@ -134,27 +165,38 @@ func (lc *locationCache) canUseMultipleWriteLocsToRoute(resourceType resourceTyp return lc.canUseMultipleWriteLocs() && resourceType == resourceTypeDocument } +// readEndpoints returns the cached preferred read endpoints, refreshing the +// stale-endpoint set if the cache hasn't been updated within the +// unavailableLocationExpirationTime AND at least one unavailability entry is +// recorded. The refresh path used to call lc.update while still holding +// mapMutex.RLock, which deadlocks with refreshStaleEndpoints's Lock() +// (sync.RWMutex cannot upgrade RLock to Lock). We now capture the staleness +// decision under RLock, release it, and let update acquire the write lock. func (lc *locationCache) readEndpoints() ([]url.URL, error) { lc.mapMutex.RLock() - defer lc.mapMutex.RUnlock() - if time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 { - err := lc.update(nil, nil, nil, nil) - if err != nil { + stale := time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 + lc.mapMutex.RUnlock() + if stale { + if err := lc.update(nil, nil, nil, nil); err != nil { return nil, err } } + lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() return lc.locationInfo.readEndpoints, nil } func (lc *locationCache) writeEndpoints() ([]url.URL, error) { lc.mapMutex.RLock() - defer lc.mapMutex.RUnlock() - if time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 { - err := lc.update(nil, nil, nil, nil) - if err != nil { + stale := time.Since(lc.lastUpdateTime) > lc.unavailableLocationExpirationTime && len(lc.locationUnavailabilityInfoMap) > 0 + lc.mapMutex.RUnlock() + if stale { + if err := lc.update(nil, nil, nil, nil); err != nil { return nil, err } } + lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() return lc.locationInfo.writeEndpoints, nil } @@ -183,35 +225,56 @@ func (lc *locationCache) getLocation(endpoint url.URL) string { return "" } +// canUseMultipleWriteLocs returns whether the account supports multi-master +// writes. Callers that already hold lc.mapMutex use this; the public +// CanUseMultipleWriteLocations entrypoint locks first. func (lc *locationCache) canUseMultipleWriteLocs() bool { return lc.enableMultipleWriteLocations } -func (lc *locationCache) markEndpointUnavailableForRead(endpoint url.URL) error { +func (lc *locationCache) CanUseMultipleWriteLocs() bool { + lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() + return lc.enableMultipleWriteLocations +} + +func (lc *locationCache) markEndpointUnavailableForRead(endpoint url.URL) (wasAlreadyUnavailable bool, err error) { return lc.markEndpointUnavailable(endpoint, read) } -func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) error { +func (lc *locationCache) markEndpointUnavailableForWrite(endpoint url.URL) (wasAlreadyUnavailable bool, err error) { return lc.markEndpointUnavailable(endpoint, write) } -func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) error { +// markEndpointUnavailable atomically samples whether the endpoint was already +// unavailable for `op` and records the unavailability. Returning the prior +// state from inside the same critical section that performs the mark +// eliminates the check-then-act race exploited by concurrent callers. +func (lc *locationCache) markEndpointUnavailable(endpoint url.URL, op requestedOperations) (wasAlreadyUnavailable bool, err error) { now := time.Now() lc.mapMutex.Lock() + defer lc.mapMutex.Unlock() if info, ok := lc.locationUnavailabilityInfoMap[endpoint]; ok { + wasAlreadyUnavailable = op&info.unavailableOps == op && + time.Since(info.lastCheckTime) < lc.unavailableLocationExpirationTime info.lastCheckTime = now info.unavailableOps |= op lc.locationUnavailabilityInfoMap[endpoint] = info } else { - info = locationUnavailabilityInfo{ + lc.locationUnavailabilityInfoMap[endpoint] = locationUnavailabilityInfo{ lastCheckTime: now, unavailableOps: op, } - lc.locationUnavailabilityInfoMap[endpoint] = info } - lc.mapMutex.Unlock() - err := lc.update(nil, nil, nil, nil) - return err + // Fast path: if the endpoint was already unavailable for this op within + // the unavailability window, the route lists cannot change -- skip the + // full updateLocked recompute. The only state mutation is the bumped + // lastCheckTime, which getPrefAvailableEndpointsLocked observes via + // isEndpointUnavailableLocked and which yields the same result. + if wasAlreadyUnavailable { + return wasAlreadyUnavailable, nil + } + return wasAlreadyUnavailable, lc.updateLocked(nil, nil, nil, nil) } func (lc *locationCache) databaseAccountRead(dbAcct accountProperties) error { @@ -221,9 +284,12 @@ func (lc *locationCache) databaseAccountRead(dbAcct accountProperties) error { func (lc *locationCache) refreshStaleEndpoints() { lc.mapMutex.Lock() defer lc.mapMutex.Unlock() + lc.refreshStaleEndpointsLocked() +} + +func (lc *locationCache) refreshStaleEndpointsLocked() { for endpoint, info := range lc.locationUnavailabilityInfoMap { - t := time.Since(info.lastCheckTime) - if t > lc.unavailableLocationExpirationTime { + if time.Since(info.lastCheckTime) > lc.unavailableLocationExpirationTime { delete(lc.locationUnavailabilityInfoMap, endpoint) } } @@ -231,23 +297,31 @@ func (lc *locationCache) refreshStaleEndpoints() { func (lc *locationCache) isEndpointUnavailable(endpoint url.URL, ops requestedOperations) bool { lc.mapMutex.RLock() + defer lc.mapMutex.RUnlock() + return lc.isEndpointUnavailableLocked(endpoint, ops) +} + +func (lc *locationCache) isEndpointUnavailableLocked(endpoint url.URL, ops requestedOperations) bool { info, ok := lc.locationUnavailabilityInfoMap[endpoint] - lc.mapMutex.RUnlock() if ops == none || !ok || ops&info.unavailableOps != ops { return false } return time.Since(info.lastCheckTime) < lc.unavailableLocationExpirationTime } -func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url.URL, locs []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { +// getPrefAvailableEndpointsLocked returns the endpoints for the customer's +// preferred locations in priority order, with unavailable endpoints moved to +// the tail. Callers pass prefLocations explicitly so updateLocked can compute +// route lists from the in-progress nextLoc snapshot rather than the +// already-committed lc.locationInfo. +func (lc *locationCache) getPrefAvailableEndpointsLocked(endpointsByLoc map[string]url.URL, locs []string, prefLocations []string, availOps requestedOperations, fallbackEndpoint url.URL) []url.URL { endpoints := make([]url.URL, 0) if lc.enableCrossRegionRetries { if lc.canUseMultipleWriteLocs() || availOps&read != 0 { unavailEndpoints := make([]url.URL, 0) - unavailEndpoints = append(unavailEndpoints, fallbackEndpoint) - for _, loc := range lc.locationInfo.prefLocations { - if endpoint, ok := endpointsByLoc[loc]; ok && endpoint != fallbackEndpoint { - if lc.isEndpointUnavailable(endpoint, availOps) { + for _, loc := range prefLocations { + if endpoint, ok := endpointsByLoc[loc]; ok { + if lc.isEndpointUnavailableLocked(endpoint, availOps) { unavailEndpoints = append(unavailEndpoints, endpoint) } else { endpoints = append(endpoints, endpoint) @@ -264,6 +338,13 @@ func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url } } if len(endpoints) == 0 { + // Last resort: none of the customer's preferred regions matched the + // account's regions (or cross-region retries are off and the + // non-multi-master read branch yielded nothing, or the account + // itself advertises zero regions). The caller passes a regional + // fallback whenever availWriteLocations is non-empty, so the only + // time this is the customer-supplied default endpoint is the + // degenerate "zero regions" case. endpoints = append(endpoints, fallbackEndpoint) } return endpoints @@ -291,6 +372,12 @@ func newDatabaseAccountLocationsInfo(prefLocations []string, defaultEndpoint url availReadLocs := make([]string, 0) availWriteEndpointsByLocation := make(map[string]url.URL) availReadEndpointsByLocation := make(map[string]url.URL) + // Pre-populated seed: the lists contain defaultEndpoint until the first + // successful Update() replaces them with regional endpoints. This is + // safe because the pipeline policy (globalEndpointManagerPolicy) blocks + // data-plane requests on a synchronous bootstrap and surfaces the GEM + // error if it fails, so resolveServiceEndpoint is never consulted for a + // real data-plane request while these seeded values are still in effect. writeEndpoints := []url.URL{defaultEndpoint} readEndpoints := []url.URL{defaultEndpoint} return &databaseAccountLocationsInfo{ diff --git a/sdk/data/azcosmos/cosmos_location_cache_test.go b/sdk/data/azcosmos/cosmos_location_cache_test.go index 43e797ff516b..3231fa12bf0f 100644 --- a/sdk/data/azcosmos/cosmos_location_cache_test.go +++ b/sdk/data/azcosmos/cosmos_location_cache_test.go @@ -94,7 +94,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { lc := ResetLocationCache() var firstCheckTime time.Time // mark endpoint unavailable for first time - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -111,7 +111,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { } // mark endpoint unavailable for second time time.Sleep(100 * time.Millisecond) - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -131,7 +131,7 @@ func TestMarkEndpointUnavailable(t *testing.T) { func TestRefreshStaleEndpoints(t *testing.T) { lc := ResetLocationCache() // mark endpoint unavailable for first time - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -150,11 +150,11 @@ func TestRefreshStaleEndpoints(t *testing.T) { func TestIsEndpointUnavailable(t *testing.T) { lc := ResetLocationCache() - err := lc.markEndpointUnavailableForRead(*loc1Endpoint) + _, err := lc.markEndpointUnavailableForRead(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - err = lc.markEndpointUnavailableForWrite(*loc2Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc2Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } @@ -262,15 +262,17 @@ func TestGetPrefAvailableEndpoints(t *testing.T) { t.Fatalf("Received error Reading DB account: %s", err.Error()) } // marks loc1 unavailable, which will put it last in the preferred available endpoint list - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } // loc1: unavailable, loc2: available, loc5: non-existent lc.locationInfo.prefLocations = []string{loc1.Name, loc2.Name, "location5"} - prefWriteEndpoints := lc.getPrefAvailableEndpoints(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, write, lc.defaultEndpoint) - // loc2: preferred + available, default: fallback endpoint, loc1: unavailable + preferred - expectedWriteEndpoints := []*url.URL{loc2Endpoint, defaultEndpoint, loc1Endpoint} + prefWriteEndpoints := lc.getPrefAvailableEndpointsLocked(lc.locationInfo.availWriteEndpointsByLocation, lc.locationInfo.availWriteLocations, lc.locationInfo.prefLocations, write, lc.defaultEndpoint) + // loc2: preferred + available; loc1: unavailable + preferred (moved to + // tail). The trailing default-endpoint fallback was removed when we + // stopped routing data-plane traffic to the customer-supplied endpoint. + expectedWriteEndpoints := []*url.URL{loc2Endpoint, loc1Endpoint} for i, endpoint := range expectedWriteEndpoints { if endpoint.String() != prefWriteEndpoints[i].String() { @@ -289,7 +291,10 @@ func TestReadEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - expectedReadEndpoints := []*url.URL{loc2Endpoint, loc4Endpoint, loc1Endpoint} + // Previously the code skipped loc1 in the main pref loop (because it + // equalled the write fallback), pushing it to the tail. Now loc1 is + // included in preferred order, giving a more intuitive result. + expectedReadEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc4Endpoint} actualReadEndpoints, err := lc.readEndpoints() if err != nil { t.Fatalf("Received error getting read endpoints: %s", err.Error()) @@ -305,11 +310,11 @@ func TestReadEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - err = lc.markEndpointUnavailableForRead(*loc2Endpoint) + _, err = lc.markEndpointUnavailableForRead(*loc2Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - expectedReadEndpoints = []*url.URL{loc4Endpoint, loc1Endpoint, loc2Endpoint} + expectedReadEndpoints = []*url.URL{loc1Endpoint, loc4Endpoint, loc2Endpoint} actualReadEndpoints, err = lc.readEndpoints() if err != nil { t.Fatalf("Received error getting read endpoints: %s", err.Error()) @@ -337,7 +342,9 @@ func TestWriteEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - expectedWriteEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc3Endpoint, defaultEndpoint} + // Trailing default-endpoint fallback was removed: route lists must + // contain only regional endpoints. + expectedWriteEndpoints := []*url.URL{loc1Endpoint, loc2Endpoint, loc3Endpoint} actualWriteEndpoints, err := lc.writeEndpoints() if err != nil { t.Fatalf("Received error getting write endpoints: %s", err.Error()) @@ -353,11 +360,11 @@ func TestWriteEndpoints(t *testing.T) { } lc.lastUpdateTime = time.Now().Add(-1*defaultExpirationTime - 1*time.Second) - err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) + _, err = lc.markEndpointUnavailableForWrite(*loc1Endpoint) if err != nil { t.Fatalf("Received error marking endpoint unavailable: %s", err.Error()) } - expectedWriteEndpoints = []*url.URL{loc2Endpoint, loc3Endpoint, defaultEndpoint, loc1Endpoint} + expectedWriteEndpoints = []*url.URL{loc2Endpoint, loc3Endpoint, loc1Endpoint} actualWriteEndpoints, err = lc.writeEndpoints() if err != nil { t.Fatalf("Received error getting write endpoints: %s", err.Error())