diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 5c33379ef3e6..991c7ce14ecd 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -12,6 +12,7 @@ ### Bugs Fixed +* Fixed `GetChangeFeed` to survive partition splits: customer-supplied `FeedRange`s are now overlap-matched against the routing map, `410/Gone` triggers a cache refresh and bounded retry, split parents expand into per-child queue entries (inheriting the parent's ETag), and the continuation token persists multi-range state across calls. Continuation tokens are guarded against cross-container reuse. See [PR 26768](https://github.com/Azure/azure-sdk-for-go/pull/26768). * Fixed V2 partition key routing: the top 2 bits of the first EPK byte are now masked to stay within the partition key range space [0x00, 0x3F]. Previously, items whose V2 hash started with a byte >= 0x40 could fail routing in ReadMany because the EPK lexicographically exceeded the "FF" range sentinel. See [PR 26723](https://github.com/Azure/azure-sdk-for-go/pull/26723) * Fixed error handling for partition key range calls which would previously cause panics on any error. See [PR 26723](https://github.com/Azure/azure-sdk-for-go/pull/26723) diff --git a/sdk/data/azcosmos/cosmos_change_feed_composite_continuation_token.go b/sdk/data/azcosmos/cosmos_change_feed_composite_continuation_token.go index 744765348fbd..8346813b25e7 100644 --- a/sdk/data/azcosmos/cosmos_change_feed_composite_continuation_token.go +++ b/sdk/data/azcosmos/cosmos_change_feed_composite_continuation_token.go @@ -3,6 +3,8 @@ package azcosmos +import "github.com/Azure/azure-sdk-for-go/sdk/azcore" + // Version 1 is the initial version of the composite continuation token. const cosmosCompositeContinuationTokenVersion = 1 @@ -25,3 +27,51 @@ func newCompositeContinuationToken(resourceID string, continuation []changeFeedR Continuation: continuation, } } + +// head returns a pointer to the head queue entry, or nil if the queue is empty. +// Callers MUST NOT mutate the returned entry; use replaceHeadWithChildren or +// advance instead, which preserve queue invariants. +func (t *compositeContinuationToken) head() *changeFeedRange { + if t == nil || len(t.Continuation) == 0 { + return nil + } + return &t.Continuation[0] +} + +// advance rotates the head entry to the tail of the queue, updating its +// ContinuationToken to the freshly-returned ETag from the just-completed +// request. This is the FIFO rotation used after every successful 200 (or +// 304 — both progress the per-range ETag). +// +// No-op if the queue is empty. If newETag is empty, the head's existing +// ContinuationToken is preserved (the service didn't issue a new ETag, +// e.g., on a 304 without one). +func (t *compositeContinuationToken) advance(newETag azcore.ETag) { + if t == nil || len(t.Continuation) == 0 { + return + } + head := t.Continuation[0] + if newETag != "" { + etagCopy := newETag + head.ContinuationToken = &etagCopy + } + + // Rotate: drop head, append to tail. + t.Continuation = append(t.Continuation[1:], head) +} + +// replaceHeadWithChildren replaces the head queue entry with the provided +// child entries, preserving the order of the children at the front of the +// queue. Used when a 410/Gone refresh reveals a split (parent → N children) +// or when initial overlap resolution returns multiple children for one +// customer-supplied FeedRange. +// +// No-op if children is empty (degenerate split that produces no overlap). +// No-op if the queue is empty (caller should not have called this). +func (t *compositeContinuationToken) replaceHeadWithChildren(children []changeFeedRange) { + if t == nil || len(t.Continuation) == 0 || len(children) == 0 { + return + } + rest := t.Continuation[1:] + t.Continuation = append(append(make([]changeFeedRange, 0, len(children)+len(rest)), children...), rest...) +} diff --git a/sdk/data/azcosmos/cosmos_change_feed_request_options.go b/sdk/data/azcosmos/cosmos_change_feed_request_options.go index a7ec55823469..84b4604ff560 100644 --- a/sdk/data/azcosmos/cosmos_change_feed_request_options.go +++ b/sdk/data/azcosmos/cosmos_change_feed_request_options.go @@ -4,7 +4,7 @@ package azcosmos import ( - "encoding/json" + "fmt" "strconv" "time" ) @@ -39,55 +39,46 @@ type ChangeFeedOptions struct { ThroughputBucket *int32 } -func (options *ChangeFeedOptions) toHeaders(partitionKeyRanges []partitionKeyRange) *map[string]string { - headers := make(map[string]string) - +// buildRequestHeaders constructs the exact headers needed for a single +// change-feed request against one queue head. Pure builder: callers MUST +// supply the head and the already-resolved PK-range ID (overlap-matched +// via the routing map, NOT exact-matched). +// +// This is the path used by the new queue-driven GetChangeFeed loop. The +// caller-options-level Continuation token is NOT consulted here — the +// queue-head ETag drives If-None-Match because the queue may have been +// split-expanded since the token was issued. +// +// Returns an error when PartitionKey serialization fails — sending a +// change-feed read with a missing PK header would yield an opaque +// server-side error, so we surface the cause to the caller instead. +func (options *ChangeFeedOptions) buildRequestHeaders(head changeFeedRange, resolvedPKRangeID string) (map[string]string, error) { + headers := make(map[string]string, 6) headers[cosmosHeaderChangeFeed] = cosmosHeaderValuesChangeFeed - if options.MaxItemCount > 0 { - headers[cosmosHeaderMaxItemCount] = strconv.FormatInt(int64(options.MaxItemCount), 10) - } - - if options.StartFrom != nil { - formatted := options.StartFrom.UTC().Format(time.RFC1123) - headers[cosmosHeaderIfModifiedSince] = formatted - } - - if options.Continuation != nil && *options.Continuation != "" { - var compositeToken compositeContinuationToken - if err := json.Unmarshal([]byte(*options.Continuation), &compositeToken); err == nil && len(compositeToken.Continuation) > 0 { - if compositeToken.Continuation[0].ContinuationToken != nil { - headers[headerIfNoneMatch] = string(*compositeToken.Continuation[0].ContinuationToken) - } - if options.FeedRange == nil { - options.FeedRange = &FeedRange{ - MinInclusive: compositeToken.Continuation[0].MinInclusive, - MaxExclusive: compositeToken.Continuation[0].MaxExclusive, - } - } - } else { - headers[headerIfNoneMatch] = *options.Continuation + if options != nil { + if options.MaxItemCount > 0 { + headers[cosmosHeaderMaxItemCount] = strconv.FormatInt(int64(options.MaxItemCount), 10) } - } - - if options.PartitionKey != nil { - partitionKeyJSON, err := options.PartitionKey.toJsonString() - if err == nil { - headers[cosmosHeaderPartitionKey] = string(partitionKeyJSON) + if options.StartFrom != nil { + headers[cosmosHeaderIfModifiedSince] = options.StartFrom.UTC().Format(time.RFC1123) + } + if options.PartitionKey != nil { + pkJSON, err := options.PartitionKey.toJsonString() + if err != nil { + return nil, fmt.Errorf("ChangeFeedOptions: serializing PartitionKey: %w", err) + } + headers[cosmosHeaderPartitionKey] = string(pkJSON) } } - if options.FeedRange != nil && len(partitionKeyRanges) > 0 { - if id, err := findPartitionKeyRangeID(*options.FeedRange, partitionKeyRanges); err == nil { - headers[headerXmsDocumentDbPartitionKeyRangeId] = id - } else { - return nil - } + if head.ContinuationToken != nil && *head.ContinuationToken != "" { + headers[headerIfNoneMatch] = string(*head.ContinuationToken) } - if len(headers) == 0 { - return nil + if resolvedPKRangeID != "" { + headers[headerXmsDocumentDbPartitionKeyRangeId] = resolvedPKRangeID } - return &headers + return headers, nil } diff --git a/sdk/data/azcosmos/cosmos_change_feed_request_options_test.go b/sdk/data/azcosmos/cosmos_change_feed_request_options_test.go index 465add67d048..dae6e108f959 100644 --- a/sdk/data/azcosmos/cosmos_change_feed_request_options_test.go +++ b/sdk/data/azcosmos/cosmos_change_feed_request_options_test.go @@ -4,265 +4,94 @@ package azcosmos import ( - "encoding/json" "testing" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/stretchr/testify/require" ) -func TestChangeFeedOptionsToHeaders(t *testing.T) { +// TestChangeFeedOptions_BuildRequestHeaders_Defaults verifies the minimum +// header set produced for an empty options value: only the change-feed AIM +// header is set; no MaxItemCount, no IfModifiedSince, no PartitionKey. +func TestChangeFeedOptions_BuildRequestHeaders_Defaults(t *testing.T) { options := &ChangeFeedOptions{} - headers := options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - - h := *headers - if h[cosmosHeaderChangeFeed] != cosmosHeaderValuesChangeFeed { - t.Errorf("Expected default AIM to be %v, got %v", cosmosHeaderValuesChangeFeed, h[cosmosHeaderChangeFeed]) - } - - options.MaxItemCount = 10 - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - if h[cosmosHeaderMaxItemCount] != "10" { - t.Errorf("Expected MaxItemCount to be 10, got %v", h[cosmosHeaderMaxItemCount]) - } - - continuation := "test-etag" - options.Continuation = &continuation - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - if h[headerIfNoneMatch] != "test-etag" { - t.Errorf("Expected IfNoneMatch to be \"test-etag\", got %v", h[headerIfNoneMatch]) - } - - now := time.Now().UTC() - options.StartFrom = &now - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - expectedIfModifiedSince := now.Format(time.RFC1123) - if h[cosmosHeaderIfModifiedSince] != expectedIfModifiedSince { - t.Errorf("Expected IfModifiedSince to be %v, got %v", expectedIfModifiedSince, h[cosmosHeaderIfModifiedSince]) - } - - pk := NewPartitionKeyString("pkvalue") - options.PartitionKey = &pk - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - pkJSON, _ := pk.toJsonString() - if h[cosmosHeaderPartitionKey] != string(pkJSON) { - t.Errorf("Expected PartitionKey to be %v, got %v", string(pkJSON), h[cosmosHeaderPartitionKey]) - } - - feedRange := &FeedRange{ - MinInclusive: "00", - MaxExclusive: "FF", - } - options.FeedRange = feedRange - - partitionKeyRanges := []partitionKeyRange{ - { - ID: "0", - MinInclusive: "00", - MaxExclusive: "FF", - }, - } - - headers = options.toHeaders(partitionKeyRanges) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - if h[headerXmsDocumentDbPartitionKeyRangeId] != "0" { - t.Errorf("Expected partition key range ID to be 0, got %v", h[headerXmsDocumentDbPartitionKeyRangeId]) - } - - partitionKeyRangesNoMatch := []partitionKeyRange{ - { - ID: "1", - MinInclusive: "AA", - MaxExclusive: "BB", - }, - } - - headers = options.toHeaders(partitionKeyRangesNoMatch) - if headers != nil { - t.Errorf("Expected nil headers when no matching partition key range found") - } - - options.FeedRange = nil - - emptyContinuation := "" - options.Continuation = &emptyContinuation - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - if _, exists := h[headerIfNoneMatch]; exists { - t.Errorf("Expected no IfNoneMatch header for empty continuation") - } - - options.Continuation = nil - headers = options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - h = *headers - if _, exists := h[headerIfNoneMatch]; exists { - t.Errorf("Expected no IfNoneMatch header for nil continuation") - } + headers, err := options.buildRequestHeaders(changeFeedRange{}, "") + require.NoError(t, err) + require.Equal(t, cosmosHeaderValuesChangeFeed, headers[cosmosHeaderChangeFeed]) + _, hasMaxItem := headers[cosmosHeaderMaxItemCount] + require.False(t, hasMaxItem, "MaxItemCount header must be omitted when 0") + _, hasIfModified := headers[cosmosHeaderIfModifiedSince] + require.False(t, hasIfModified, "IfModifiedSince must be omitted when StartFrom is nil") + _, hasPK := headers[cosmosHeaderPartitionKey] + require.False(t, hasPK, "PartitionKey header must be omitted when PartitionKey is nil") + _, hasIfNoneMatch := headers[headerIfNoneMatch] + require.False(t, hasIfNoneMatch, "IfNoneMatch must be omitted when head ContinuationToken is nil") + _, hasPKRangeID := headers[headerXmsDocumentDbPartitionKeyRangeId] + require.False(t, hasPKRangeID, "PK-range-id header must be omitted when resolvedPKRangeID is empty") } -func TestChangeFeedOptionsToHeadersWithAllFields(t *testing.T) { +// TestChangeFeedOptions_BuildRequestHeaders_AllFields verifies that every +// supported field of ChangeFeedOptions plus the resolved head/PK-range ID +// is materialized into the expected header. +func TestChangeFeedOptions_BuildRequestHeaders_AllFields(t *testing.T) { now := time.Now().UTC() pk := NewPartitionKeyString("testPK") - continuation := "test-continuation" - feedRange := &FeedRange{ - MinInclusive: "10", - MaxExclusive: "20", - } + etag := azcore.ETag("\"etag-12345\"") options := &ChangeFeedOptions{ MaxItemCount: 25, StartFrom: &now, PartitionKey: &pk, - FeedRange: feedRange, - Continuation: &continuation, } - - partitionKeyRanges := []partitionKeyRange{ - { - ID: "range1", - MinInclusive: "10", - MaxExclusive: "20", - }, + head := changeFeedRange{ + MinInclusive: "10", + MaxExclusive: "20", + ContinuationToken: &etag, } - headers := options.toHeaders(partitionKeyRanges) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } + headers, err := options.buildRequestHeaders(head, "range1") + require.NoError(t, err) - h := *headers - if h[cosmosHeaderMaxItemCount] != "25" { - t.Errorf("Expected MaxItemCount to be 25, got %v", h[cosmosHeaderMaxItemCount]) - } - - expectedIfModifiedSince := now.Format(time.RFC1123) - if h[cosmosHeaderIfModifiedSince] != expectedIfModifiedSince { - t.Errorf("Expected IfModifiedSince to be %v, got %v", expectedIfModifiedSince, h[cosmosHeaderIfModifiedSince]) - } + require.Equal(t, cosmosHeaderValuesChangeFeed, headers[cosmosHeaderChangeFeed]) + require.Equal(t, "25", headers[cosmosHeaderMaxItemCount]) + require.Equal(t, now.Format(time.RFC1123), headers[cosmosHeaderIfModifiedSince]) pkJSON, _ := pk.toJsonString() - if h[cosmosHeaderPartitionKey] != string(pkJSON) { - t.Errorf("Expected PartitionKey to be %v, got %v", string(pkJSON), h[cosmosHeaderPartitionKey]) - } - - if h[headerXmsDocumentDbPartitionKeyRangeId] != "range1" { - t.Errorf("Expected partition key range ID to be range1, got %v", h[headerXmsDocumentDbPartitionKeyRangeId]) - } + require.Equal(t, string(pkJSON), headers[cosmosHeaderPartitionKey]) - if h[headerIfNoneMatch] != continuation { - t.Errorf("Expected IfNoneMatch to be %v, got %v", continuation, h[headerIfNoneMatch]) - } - - if h[cosmosHeaderChangeFeed] != cosmosHeaderValuesChangeFeed { - t.Errorf("Expected AIM to be %v, got %v", cosmosHeaderValuesChangeFeed, h[cosmosHeaderChangeFeed]) - } + require.Equal(t, "range1", headers[headerXmsDocumentDbPartitionKeyRangeId]) + require.Equal(t, string(etag), headers[headerIfNoneMatch]) } -func TestChangeFeedOptionsCompositeContinuationToken(t *testing.T) { - etag := azcore.ETag("test-etag") - cfRange := newChangeFeedRange("00", "FF", &ChangeFeedRangeOptions{ - ContinuationToken: &etag, - }) - compositeToken := newCompositeContinuationToken("test-resource-id", []changeFeedRange{cfRange}) - - tokenBytes, err := json.Marshal(compositeToken) - if err != nil { - t.Fatalf("Failed to marshal composite token: %v", err) - } - tokenString := string(tokenBytes) - - options := &ChangeFeedOptions{ - Continuation: &tokenString, - } - - headers := options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - - h := *headers - - if h[headerIfNoneMatch] != string(etag) { - t.Errorf("Expected IfNoneMatch to be %v, got %v", string(etag), h[headerIfNoneMatch]) - } - - if options.FeedRange == nil { - t.Fatal("Expected FeedRange to be set from composite token") - } - if options.FeedRange.MinInclusive != "00" { - t.Errorf("Expected FeedRange.MinInclusive to be 00, got %v", options.FeedRange.MinInclusive) - } - if options.FeedRange.MaxExclusive != "FF" { - t.Errorf("Expected FeedRange.MaxExclusive to be FF, got %v", options.FeedRange.MaxExclusive) - } +// TestChangeFeedOptions_BuildRequestHeaders_EmptyContinuationOmitsIfNoneMatch +// verifies that an explicitly-set-but-empty continuation token does not +// produce an IfNoneMatch header (would otherwise cause the server to treat +// every read as conditional against the empty string). +func TestChangeFeedOptions_BuildRequestHeaders_EmptyContinuationOmitsIfNoneMatch(t *testing.T) { + emptyETag := azcore.ETag("") + head := changeFeedRange{ + MinInclusive: "00", + MaxExclusive: "FF", + ContinuationToken: &emptyETag, + } + + headers, err := (&ChangeFeedOptions{}).buildRequestHeaders(head, "0") + require.NoError(t, err) + _, exists := headers[headerIfNoneMatch] + require.False(t, exists, "empty ContinuationToken must NOT produce an IfNoneMatch header") } -func TestChangeFeedOptionsCompositeContinuationTokenWithExistingFeedRange(t *testing.T) { - etag := azcore.ETag("test-etag") - cfRange := newChangeFeedRange("00", "FF", &ChangeFeedRangeOptions{ - ContinuationToken: &etag, - }) - compositeToken := newCompositeContinuationToken("test-resource-id", []changeFeedRange{cfRange}) - - tokenBytes, err := json.Marshal(compositeToken) - if err != nil { - t.Fatalf("Failed to marshal composite token: %v", err) - } - tokenString := string(tokenBytes) - - explicitFeedRange := &FeedRange{ - MinInclusive: "AA", - MaxExclusive: "BB", - } - - options := &ChangeFeedOptions{ - Continuation: &tokenString, - FeedRange: explicitFeedRange, - } - - headers := options.toHeaders(nil) - if headers == nil { - t.Fatal("toHeaders should return non-nil") - } - - h := *headers - - if h[headerIfNoneMatch] != string(etag) { - t.Errorf("Expected IfNoneMatch to be %v, got %v", string(etag), h[headerIfNoneMatch]) - } - - if options.FeedRange.MinInclusive != "AA" { - t.Errorf("Expected FeedRange.MinInclusive to remain AA, got %v", options.FeedRange.MinInclusive) - } - if options.FeedRange.MaxExclusive != "BB" { - t.Errorf("Expected FeedRange.MaxExclusive to remain BB, got %v", options.FeedRange.MaxExclusive) - } +// TestChangeFeedOptions_BuildRequestHeaders_PartitionKeySerializationError +// exercises the new error-returning contract: a PartitionKey whose +// serialization fails is surfaced to the caller rather than silently +// dropped (the original-bug pattern from toHeaders). +func TestChangeFeedOptions_BuildRequestHeaders_PartitionKeySerializationError(t *testing.T) { + // json.Marshal cannot serialize a channel; toJsonString surfaces that error. + pk := NewPartitionKey() + pk.values = []interface{}{make(chan int)} + options := &ChangeFeedOptions{PartitionKey: &pk} + + _, err := options.buildRequestHeaders(changeFeedRange{}, "") + require.Error(t, err, "buildRequestHeaders must surface PartitionKey serialization errors") } diff --git a/sdk/data/azcosmos/cosmos_change_feed_response.go b/sdk/data/azcosmos/cosmos_change_feed_response.go index f348458ecffc..8d9f7d2aeef3 100644 --- a/sdk/data/azcosmos/cosmos_change_feed_response.go +++ b/sdk/data/azcosmos/cosmos_change_feed_response.go @@ -35,13 +35,19 @@ func newChangeFeedResponse(resp *http.Response) (ChangeFeedResponse, error) { Response: newResponse(resp), } + // Always close the body, including the 304 short-circuit below. The + // drain loop emits one response per queue head, so a quiet container + // can yield N intermediate 304s per GetChangeFeed call — leaking those + // bodies (which still have an http.Response.Body even when empty) until + // GC would compound across calls. + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode == http.StatusNotModified { response.Documents = []json.RawMessage{} response.Count = 0 return response, nil } - defer func() { _ = resp.Body.Close() }() body, err := azruntime.Payload(resp) if err != nil { return response, wrapResponseError(err, response.Response) @@ -53,8 +59,19 @@ func newChangeFeedResponse(resp *http.Response) (ChangeFeedResponse, error) { return response, nil } -// PopulateCompositeContinuationToken generates and sets the composite continuation token if a feed range was used +// PopulateCompositeContinuationToken generates and sets the composite continuation +// token from response.FeedRange + response.ETag. Retained for back-compat. +// +// In the multi-range queue-driven GetChangeFeed path, response.ContinuationToken +// is already populated with the multi-range composite token by the drain loop +// itself. This method is therefore a no-op when ContinuationToken is non-empty +// — overwriting it with a single-range token rebuilt from the per-head +// FeedRange would lose the multi-range queue state and cause callers that +// resume with the resulting token to skip subsequent ranges after a split. func (response *ChangeFeedResponse) PopulateCompositeContinuationToken() { + if response.ContinuationToken != "" { + return + } if response.FeedRange != nil && response.ETag != "" { compositeToken, err := response.GetCompositeContinuationToken() if err == nil && compositeToken != "" { diff --git a/sdk/data/azcosmos/cosmos_change_feed_split_test.go b/sdk/data/azcosmos/cosmos_change_feed_split_test.go new file mode 100644 index 000000000000..98a613f20264 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_change_feed_split_test.go @@ -0,0 +1,619 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/url" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/internal/mock" + "github.com/stretchr/testify/require" +) + +// returnGoneOnChangeFeedPolicy is the change-feed-side analogue of +// returnGoneOnQueryPolicy in cosmos_container_read_many_test.go: it returns 410/Gone +// with a configurable PK-range substatus on change-feed requests (identified via +// the A-IM = "Incremental feed" header) until maxGone such responses have been +// emitted, after which subsequent calls pass through. +type returnGoneOnChangeFeedPolicy struct { + maxGone int32 + substatus string + count atomic.Int32 +} + +func (p *returnGoneOnChangeFeedPolicy) Do(req *policy.Request) (*http.Response, error) { + // Match only true change-feed reads (against /docs), not the PK-range cache's + // incremental refresh (against /pkranges) which also carries the A-IM header. + if req.Raw().Header.Get(cosmosHeaderChangeFeed) != cosmosHeaderValuesChangeFeed || + !strings.HasSuffix(req.Raw().URL.Path, "/docs") { + return req.Next() + } + n := p.count.Add(1) + if n <= p.maxGone { + headers := http.Header{} + headers.Set(cosmosHeaderSubstatus, p.substatus) + return &http.Response{ + StatusCode: http.StatusGone, + Status: "410 Gone", + Header: headers, + Body: io.NopCloser(strings.NewReader(`{"message":"Gone"}`)), + Request: req.Raw(), + }, nil + } + return req.Next() +} + +// createChangeFeedTestClient mirrors createReadManyTestClient but pre-seeds the PK +// range cache with the supplied physical ranges. Container cache is pre-populated +// with ResourceID=testRID so cross-container validation can be exercised. +func createChangeFeedTestClient(t *testing.T, srv *mock.Server, policies []policy.Policy, ranges []partitionKeyRange) *Client { + t.Helper() + defaultEndpoint, err := url.Parse(srv.URL()) + require.NoError(t, err) + + internalClient, err := azcore.NewClient("azcosmostest", "v1.0.0", + azruntime.PipelineOptions{PerCall: policies}, + &policy.ClientOptions{Transport: srv}) + require.NoError(t, err) + + containerCache := newContainerPropertiesCache() + pkRangeCache := newPartitionKeyRangeCache() + gem := &globalEndpointManager{preferredLocations: []string{}} + + client := &Client{ + endpoint: srv.URL(), + endpointUrl: defaultEndpoint, + internal: internalClient, + gem: gem, + caches: &sharedCacheSet{ + containerCache: containerCache, + pkRangeCache: pkRangeCache, + }, + } + + containerLink := "dbs/databaseId/colls/containerId" + containerCache.set(containerLink, &ContainerProperties{ + ID: "containerId", + ResourceID: "testRID", + PartitionKeyDefinition: PartitionKeyDefinition{ + Paths: []string{"/pk"}, + Kind: PartitionKeyKindHash, + Version: 2, + }, + }) + + pkRangeCache.entries["testRID"] = &pkRangeCacheEntry{ + routingMap: newCollectionRoutingMap(ranges, "etag1"), + } + + return client +} + +// TestGetChangeFeed_410Gone_TriggersCacheRefreshAndRetry — Phase 1's headline win: +// when the gateway returns 410/Gone with a PK-range substatus, the change-feed +// request must (a) refresh the PK-range cache, (b) retry on the freshly-resolved +// range, (c) succeed transparently to the caller without losing the response. +func TestGetChangeFeed_410Gone_TriggersCacheRefreshAndRetry(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + gonePolicy := &returnGoneOnChangeFeedPolicy{maxGone: 1, substatus: subStatusPartitionKeyRangeGone} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{gonePolicy}, ranges) + + containerPropsResp := []byte(`{ + "id": "containerId", + "_rid": "testRID", + "_self": "dbs/db1/colls/containerId/", + "partitionKey": {"paths": ["/pk"], "kind": "Hash", "version": 2} + }`) + pkRangeResp := []byte(`{ + "_rid": "testRID", + "PartitionKeyRanges": [{"_rid": "testRID", "id": "0", "minInclusive": "", "maxExclusive": "FF"}], + "_count": 1 + }`) + cfBody := []byte(`{"_rid":"testRID","Documents":[{"id":"doc1"}],"_count":1}`) + + // Sequence after the policy returns 410 once: + // container props re-fetch → PK ranges refresh → 304 (incremental loop end) + // → change-feed retry succeeds (passes through gonePolicy because maxGone=1) + srv.AppendResponse(mock.WithBody(containerPropsResp), mock.WithStatusCode(200)) + srv.AppendResponse(mock.WithBody(pkRangeResp), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "etag2")) + srv.AppendResponse(mock.WithStatusCode(304)) + srv.AppendResponse(mock.WithBody(cfBody), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "\"new-etag\""), + mock.WithHeader(cosmosHeaderRequestCharge, "1.0")) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "00", MaxExclusive: "FF"}, + }) + require.NoError(t, err) + require.Equal(t, 1, resp.Count, "retry must surface the success page from the second attempt") + require.Equal(t, int32(2), gonePolicy.count.Load(), "expected initial 410 + 1 retry on change-feed path") + require.NotEmpty(t, resp.ContinuationToken, "response must carry a continuation token after a successful retry") +} + +// TestGetChangeFeed_RetryCapAt3_ReturnsLastErrorOnRepeated410 — when the cache +// keeps returning the same range and 410s keep coming, the loop must surface the +// final 410 to the caller after exactly maxPKRangeGoneRetries attempts. +func TestGetChangeFeed_RetryCapAt3_ReturnsLastErrorOnRepeated410(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + gonePolicy := &returnGoneOnChangeFeedPolicy{maxGone: 100, substatus: subStatusPartitionKeyRangeGone} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{gonePolicy}, ranges) + + containerPropsResp := []byte(`{ + "id": "containerId", + "_rid": "testRID", + "_self": "dbs/db1/colls/containerId/", + "partitionKey": {"paths": ["/pk"], "kind": "Hash", "version": 2} + }`) + pkRangeResp := []byte(`{ + "_rid": "testRID", + "PartitionKeyRanges": [{"_rid": "testRID", "id": "0", "minInclusive": "", "maxExclusive": "FF"}], + "_count": 1 + }`) + + // Each retry needs container props + PK ranges (with ETag) + 304 to terminate the + // incremental refresh loop. After maxPKRangeGoneRetries refreshes, the next 410 is + // surfaced to the caller without further retries. + for i := 0; i < maxPKRangeGoneRetries; i++ { + srv.AppendResponse(mock.WithBody(containerPropsResp), mock.WithStatusCode(200)) + srv.AppendResponse(mock.WithBody(pkRangeResp), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "etag-refresh")) + srv.AppendResponse(mock.WithStatusCode(304)) + } + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + _, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "00", MaxExclusive: "FF"}, + }) + require.Error(t, err) + var respErr *azcore.ResponseError + require.ErrorAs(t, err, &respErr) + require.Equal(t, http.StatusGone, respErr.StatusCode) + require.Equal(t, int32(maxPKRangeGoneRetries+1), gonePolicy.count.Load(), + "expected initial attempt + maxPKRangeGoneRetries retries") +} + +// TestGetChangeFeed_TokenResourceIDMismatch_Rejected — the cross-container token +// reuse guard. A token whose ResourceID doesn't match the current container's +// must be rejected loudly; otherwise the EPK boundaries in the token would be +// misinterpreted against the wrong routing map and silent wrong data could leak. +func TestGetChangeFeed_TokenResourceIDMismatch_Rejected(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, nil, ranges) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + mismatchToken := &compositeContinuationToken{ + Version: cosmosCompositeContinuationTokenVersion, + ResourceID: "differentContainerRID", + Continuation: []changeFeedRange{ + {MinInclusive: "00", MaxExclusive: "FF"}, + }, + } + tokenJSON, err := json.Marshal(mismatchToken) + require.NoError(t, err) + tokenStr := string(tokenJSON) + + _, err = container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + Continuation: &tokenStr, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "ResourceID", + "error message must call out ResourceID mismatch so customers can diagnose token-reuse bugs") + require.Contains(t, err.Error(), "differentContainerRID") + require.Contains(t, err.Error(), "testRID") +} + +// TestGetChangeFeed_NoOverlapAfterRefresh_ReturnsErrFeedRangeUnresolved — when +// the customer's FeedRange genuinely doesn't overlap any current physical range +// (e.g., wrong container, malformed range, container recreated under same name +// with different boundaries), even a forced cache refresh can't help. The error +// MUST be wrapped as ErrFeedRangeUnresolved so callers can detect-and-recover +// rather than seeing a generic opaque failure. +func TestGetChangeFeed_NoOverlapAfterRefresh_ReturnsErrFeedRangeUnresolved(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + // The cached routing map is complete (covers ["", "FF")), so the routing-map's + // own completeness check is satisfied. The customer's FeedRange ["GG", "HH") is + // entirely above the routed key space — a malformed/foreign range that no refresh + // can rescue. The cache stays fresh; we never hit HTTP. + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, nil, ranges) + + // The forced refresh hits these; we keep the same complete-but-non-overlapping range. + containerPropsResp := []byte(`{ + "id": "containerId", + "_rid": "testRID", + "_self": "dbs/db1/colls/containerId/", + "partitionKey": {"paths": ["/pk"], "kind": "Hash", "version": 2} + }`) + pkRangeResp := []byte(`{ + "_rid": "testRID", + "PartitionKeyRanges": [{"_rid": "testRID", "id": "0", "minInclusive": "", "maxExclusive": "FF"}], + "_count": 1 + }`) + srv.AppendResponse(mock.WithBody(containerPropsResp), mock.WithStatusCode(200)) + srv.AppendResponse(mock.WithBody(pkRangeResp), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "etag-refresh")) + srv.AppendResponse(mock.WithStatusCode(304)) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + _, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "GG", MaxExclusive: "HH"}, + }) + require.Error(t, err) + require.True(t, errors.Is(err, ErrFeedRangeUnresolved), + "unresolvable FeedRange MUST be wrapped as ErrFeedRangeUnresolved; got %T: %v", err, err) +} + +// TestGetChangeFeed_SplitExpansion_RoutesToFirstChild — when the customer's +// FeedRange overlaps multiple physical ranges (the post-split case), the loop +// must replace the head with one queue entry per child and route the actual +// HTTP call against the first child. The returned token must encode the full +// child queue so the next call drains the remaining children. +func TestGetChangeFeed_SplitExpansion_RoutesToFirstChild(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + // Cache has two children where the customer's parent range used to be. + ranges := []partitionKeyRange{ + {ID: "1a", MinInclusive: "00", MaxExclusive: "55", ResourceID: "testRID"}, + {ID: "1b", MinInclusive: "55", MaxExclusive: "FF", ResourceID: "testRID"}, + } + client := createChangeFeedTestClient(t, srv, nil, ranges) + + cfBody := []byte(`{"_rid":"testRID","Documents":[{"id":"docFromFirstChild"}],"_count":1}`) + srv.AppendResponse(mock.WithBody(cfBody), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "\"etag-1a\"")) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "00", MaxExclusive: "FF"}, + }) + require.NoError(t, err) + require.Equal(t, 1, resp.Count) + require.NotEmpty(t, resp.ContinuationToken) + + // The persisted token must enumerate BOTH children so a subsequent call + // continues with 1b. After a successful 200 from 1a, advance() rotates 1a + // to the tail with its new ETag — so the queue is [1b, 1a-with-etag]. + var token compositeContinuationToken + require.NoError(t, json.Unmarshal([]byte(resp.ContinuationToken), &token)) + require.Len(t, token.Continuation, 2, "split expansion must surface both children in the continuation queue") + require.Equal(t, "55", token.Continuation[0].MinInclusive, + "after one successful 200, the new head must be the unread sibling (1b at [55, FF))") + require.Equal(t, "00", token.Continuation[1].MinInclusive, + "completed sub-range must be rotated to the tail with its updated ETag") + require.NotNil(t, token.Continuation[1].ContinuationToken, "tail entry must carry the freshly-issued ETag") + require.Equal(t, azcore.ETag("\"etag-1a\""), *token.Continuation[1].ContinuationToken) +} + +// TestGetChangeFeed_MultiRangeContinuation_RoundTrips — a token issued by an +// earlier call (multi-element queue) drives the next call against the queue's +// head, not against options.FeedRange. This is the resume-after-split pattern. +func TestGetChangeFeed_MultiRangeContinuation_RoundTrips(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + ranges := []partitionKeyRange{ + {ID: "1a", MinInclusive: "00", MaxExclusive: "55", ResourceID: "testRID"}, + {ID: "1b", MinInclusive: "55", MaxExclusive: "FF", ResourceID: "testRID"}, + } + client := createChangeFeedTestClient(t, srv, nil, ranges) + + cfBody := []byte(`{"_rid":"testRID","Documents":[{"id":"docFromHead"}],"_count":1}`) + srv.AppendResponse(mock.WithBody(cfBody), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "\"new-etag\"")) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + // Customer-supplied resume token: queue head is the second child (the unfinished one). + headETag := azcore.ETag("\"prev-etag-1b\"") + tailETag := azcore.ETag("\"prev-etag-1a\"") + multiRangeToken := compositeContinuationToken{ + Version: cosmosCompositeContinuationTokenVersion, + ResourceID: "testRID", + Continuation: []changeFeedRange{ + {MinInclusive: "55", MaxExclusive: "FF", ContinuationToken: &headETag}, + {MinInclusive: "00", MaxExclusive: "55", ContinuationToken: &tailETag}, + }, + } + tokenJSON, err := json.Marshal(multiRangeToken) + require.NoError(t, err) + tokenStr := string(tokenJSON) + + resp, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + Continuation: &tokenStr, + }) + require.NoError(t, err) + require.Equal(t, 1, resp.Count) + + // After this call, the queue must rotate so the previously-tail entry is now the head. + var rt compositeContinuationToken + require.NoError(t, json.Unmarshal([]byte(resp.ContinuationToken), &rt)) + require.Len(t, rt.Continuation, 2, "queue length must be preserved across calls") + require.Equal(t, "00", rt.Continuation[0].MinInclusive, "head should rotate to the unfinished sibling") + require.Equal(t, "55", rt.Continuation[1].MinInclusive, "completed entry should rotate to the tail") + require.NotNil(t, rt.Continuation[1].ContinuationToken) + require.Equal(t, azcore.ETag("\"new-etag\""), *rt.Continuation[1].ContinuationToken, + "tail entry must carry the freshly-issued ETag from this call's response") +} + +// TestGetChangeFeed_SplitDuringDrain_QueriesEveryNewlyInsertedChild — regression +// guard for a subtle accounting bug: when one of the queued sub-ranges splits +// MID-drain (i.e., the cache learns about new children in the middle of a single +// GetChangeFeed call's drain rotation), the rotation budget must be expanded to +// account for the inserted siblings. Otherwise the loop bails early and silently +// skips queryable ranges this call. +// +// Setup: two queued entries (A=[00,55), B=[55,FF)). The cache reflects a split +// of B into B1=[55,AA) and B2=[AA,FF). The drain sequence MUST be: +// +// A → 304, rotate. Queue: [B, A]. budget consumed=1/2. +// B → split detected, replaced with [B1, B2]. Queue: [B1, B2, A]. budget bumped to 3. +// B1 → 304, rotate. Queue: [B2, A, B1]. budget consumed=2/3. +// B2 → 304, rotate. Queue: [A, B1, B2]. budget consumed=3/3 → exit. +// +// Without the budget bump, the loop would break after B1 (rotations==2 == +// originalQueueLen==2) and never query B2 in this call. The test therefore +// asserts that THREE distinct CF reads happened (one for A, one for B1, +// one for B2), not two. +func TestGetChangeFeed_SplitDuringDrain_QueriesEveryNewlyInsertedChild(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + // The cache reflects the post-split topology: A is unchanged, B has split into B1+B2. + ranges := []partitionKeyRange{ + {ID: "A", MinInclusive: "00", MaxExclusive: "55", ResourceID: "testRID"}, + {ID: "B1", MinInclusive: "55", MaxExclusive: "AA", ResourceID: "testRID"}, + {ID: "B2", MinInclusive: "AA", MaxExclusive: "FF", ResourceID: "testRID"}, + } + + // Count CF requests AND remember which PK-range-id was queried each time so we can + // assert B2 is actually visited in this single call (the regression target). + var cfCount atomic.Int32 + queriedPKRangeIDs := make([]string, 0, 8) + var mu sync.Mutex + tracker := policyFunc(func(req *policy.Request) (*http.Response, error) { + if req.Raw().Header.Get(cosmosHeaderChangeFeed) == cosmosHeaderValuesChangeFeed && + strings.HasSuffix(req.Raw().URL.Path, "/docs") { + cfCount.Add(1) + mu.Lock() + queriedPKRangeIDs = append(queriedPKRangeIDs, req.Raw().Header.Get(cosmosHeaderPartitionKeyRangeId)) + mu.Unlock() + } + return req.Next() + }) + client := createChangeFeedTestClient(t, srv, []policy.Policy{tracker}, ranges) + + // All three sub-range reads return 304 (no new changes). The drain must hit every one. + srv.AppendResponse(mock.WithStatusCode(304), mock.WithHeader(cosmosHeaderEtag, "\"etag-A\"")) + srv.AppendResponse(mock.WithStatusCode(304), mock.WithHeader(cosmosHeaderEtag, "\"etag-B1\"")) + srv.AppendResponse(mock.WithStatusCode(304), mock.WithHeader(cosmosHeaderEtag, "\"etag-B2\"")) + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + // Resume token preserves the original two-element queue (A, B). The drain must + // expand B → [B1, B2] mid-rotation and still query B2 within this call. + prevA := azcore.ETag("\"prev-A\"") + prevB := azcore.ETag("\"prev-B\"") + resumeToken := compositeContinuationToken{ + Version: cosmosCompositeContinuationTokenVersion, + ResourceID: "testRID", + Continuation: []changeFeedRange{ + {MinInclusive: "00", MaxExclusive: "55", ContinuationToken: &prevA}, + {MinInclusive: "55", MaxExclusive: "FF", ContinuationToken: &prevB}, + }, + } + tokenJSON, err := json.Marshal(resumeToken) + require.NoError(t, err) + tokenStr := string(tokenJSON) + + _, err = container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + Continuation: &tokenStr, + }) + require.NoError(t, err) + + require.Equal(t, int32(3), cfCount.Load(), + "split-during-drain must query every newly-inserted child in the same call (got %d, want 3); queried IDs=%v", + cfCount.Load(), queriedPKRangeIDs) + + // The actual order of visitation should be A, B1, B2 (FIFO with split-expand-on-head). + mu.Lock() + got := append([]string(nil), queriedPKRangeIDs...) + mu.Unlock() + require.Equal(t, []string{"A", "B1", "B2"}, got, + "drain order must be A, B1, B2 (FIFO with split expansion on the head)") +} + +// policyFunc adapts a function to policy.Policy so we can use closures inline. +type policyFunc func(*policy.Request) (*http.Response, error) + +func (f policyFunc) Do(req *policy.Request) (*http.Response, error) { return f(req) } + +// TestClampChildrenToParent verifies the split-expansion clamping mirrors +// Java's createChildRanges: each child's [Min, Max) is intersected with the +// parent token's bounds, and children with empty intersection are dropped. +func TestClampChildrenToParent(t *testing.T) { + parent := changeFeedRange{MinInclusive: "20", MaxExclusive: "60"} + children := []partitionKeyRange{ + {ID: "left", MinInclusive: "00", MaxExclusive: "40"}, // overlaps lower half + {ID: "right", MinInclusive: "40", MaxExclusive: "FF"}, // overlaps upper half (clamped to "60") + {ID: "below", MinInclusive: "00", MaxExclusive: "10"}, // fully below parent → dropped + {ID: "above", MinInclusive: "80", MaxExclusive: "FF"}, // fully above parent → dropped + } + + clamped := clampChildrenToParent(children, parent) + require.Len(t, clamped, 2, "non-overlapping children must be dropped") + require.Equal(t, "left", clamped[0].ID) + require.Equal(t, "20", clamped[0].MinInclusive, "child Min must be raised to parent.Min") + require.Equal(t, "40", clamped[0].MaxExclusive) + require.Equal(t, "right", clamped[1].ID) + require.Equal(t, "40", clamped[1].MinInclusive) + require.Equal(t, "60", clamped[1].MaxExclusive, "child Max must be lowered to parent.Max") +} + +// TestGetChangeFeed_ContextCancelled_HonoredBetweenSubRequests verifies that +// a context cancelled mid-drain causes the loop to exit immediately on the +// next iteration boundary rather than continuing to issue requests. +func TestGetChangeFeed_ContextCancelled_HonoredBetweenSubRequests(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + var requestCount atomic.Int32 + cancelAfterFirst := policyFunc(func(req *policy.Request) (*http.Response, error) { + if req.Raw().Header.Get(cosmosHeaderChangeFeed) == cosmosHeaderValuesChangeFeed { + requestCount.Add(1) + } + return req.Next() + }) + + // 3 physical ranges so the queue has 3 heads to drain. + ranges := []partitionKeyRange{ + {ID: "0", MinInclusive: "", MaxExclusive: "55", ResourceID: "testRID"}, + {ID: "1", MinInclusive: "55", MaxExclusive: "AA", ResourceID: "testRID"}, + {ID: "2", MinInclusive: "AA", MaxExclusive: "FF", ResourceID: "testRID"}, + } + client := createChangeFeedTestClient(t, srv, []policy.Policy{cancelAfterFirst}, ranges) + + // Each head returns 304 so the loop would naturally rotate to the next head. + for i := 0; i < 5; i++ { + srv.AppendResponse(mock.WithStatusCode(304), + mock.WithHeader(cosmosHeaderEtag, "etag")) + } + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + _, err := container.GetChangeFeed(ctx, &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "", MaxExclusive: "FF"}, + }) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, int32(0), requestCount.Load(), "no change-feed requests should issue once context is cancelled") +} + +// TestGetChangeFeed_410BudgetExhaustedMidDrain_SurfacesPartialState verifies +// the partial-state contract on 410-budget exhaustion: heads that successfully +// 304'd before the failure are kept rotated in the returned token so the +// caller can resume from the failed head instead of re-querying drained ones. +func TestGetChangeFeed_410BudgetExhaustedMidDrain_SurfacesPartialState(t *testing.T) { + srv, closeSrv := mock.NewTLSServer() + defer closeSrv() + + // Three physical ranges. Head 1 succeeds (304). Head 2 hits persistent 410. + ranges := []partitionKeyRange{ + {ID: "0", MinInclusive: "", MaxExclusive: "55", ResourceID: "testRID"}, + {ID: "1", MinInclusive: "55", MaxExclusive: "AA", ResourceID: "testRID"}, + {ID: "2", MinInclusive: "AA", MaxExclusive: "FF", ResourceID: "testRID"}, + } + + cfRequestN := atomic.Int32{} + policy410After1st := policyFunc(func(req *policy.Request) (*http.Response, error) { + if req.Raw().Header.Get(cosmosHeaderChangeFeed) != cosmosHeaderValuesChangeFeed || + !strings.HasSuffix(req.Raw().URL.Path, "/docs") { + return req.Next() + } + n := cfRequestN.Add(1) + // First CF request → 304. Subsequent → 410. + if n == 1 { + h := http.Header{} + h.Set(cosmosHeaderEtag, "etag-after-head0") + return &http.Response{ + StatusCode: http.StatusNotModified, + Status: "304 Not Modified", + Header: h, + Body: io.NopCloser(strings.NewReader("")), + Request: req.Raw(), + }, nil + } + h := http.Header{} + h.Set(cosmosHeaderSubstatus, subStatusPartitionKeyRangeGone) + return &http.Response{ + StatusCode: http.StatusGone, + Status: "410 Gone", + Header: h, + Body: io.NopCloser(strings.NewReader(`{"message":"Gone"}`)), + Request: req.Raw(), + }, nil + }) + + client := createChangeFeedTestClient(t, srv, []policy.Policy{policy410After1st}, ranges) + + // Cache-refresh sequence for each 410 retry. After maxPKRangeGoneRetries + // refreshes, the next 410 surfaces with partial state. + pkRangeBody := []byte(`{ + "_rid": "testRID", + "PartitionKeyRanges": [ + {"_rid":"testRID","id":"0","minInclusive":"","maxExclusive":"55"}, + {"_rid":"testRID","id":"1","minInclusive":"55","maxExclusive":"AA"}, + {"_rid":"testRID","id":"2","minInclusive":"AA","maxExclusive":"FF"} + ], + "_count": 3 + }`) + containerPropsResp := []byte(`{ + "id":"containerId", + "_rid":"testRID", + "_self":"dbs/db1/colls/containerId/", + "partitionKey":{"paths":["/pk"],"kind":"Hash","version":2} + }`) + for i := 0; i < maxPKRangeGoneRetries; i++ { + srv.AppendResponse(mock.WithBody(containerPropsResp), mock.WithStatusCode(200)) + srv.AppendResponse(mock.WithBody(pkRangeBody), mock.WithStatusCode(200), + mock.WithHeader(cosmosHeaderEtag, "etag-refresh")) + srv.AppendResponse(mock.WithStatusCode(304)) + } + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.GetChangeFeed(context.Background(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "", MaxExclusive: "FF"}, + }) + require.Error(t, err, "persistent 410 must surface to caller") + var respErr *azcore.ResponseError + require.ErrorAs(t, err, &respErr) + require.Equal(t, http.StatusGone, respErr.StatusCode) + + // Partial state must be present: the rotated continuation token reflects + // head 0's successful 304 (its ETag should be on entry index 2 after rotation). + require.NotEmpty(t, resp.ContinuationToken, + "partial response must carry a rotated continuation token after 410-budget exhaustion") +} diff --git a/sdk/data/azcosmos/cosmos_container.go b/sdk/data/azcosmos/cosmos_container.go index 7f7cbd96a55d..caa54ed5273d 100644 --- a/sdk/data/azcosmos/cosmos_container.go +++ b/sdk/data/azcosmos/cosmos_container.go @@ -5,7 +5,6 @@ package azcosmos import ( "context" - "encoding/json" "errors" "fmt" @@ -816,8 +815,19 @@ func (c *ContainerClient) ExecuteTransactionalBatch(ctx context.Context, b Trans // GetChangeFeed retrieves a single page of the change feed using the provided options. // ctx - The context for the request. // options - Options for the operation -// If options.FeedRange is set, it will retrieve the change feed for the specific range. -// If options.Continuation contains a composite continuation token, it will extract the feed range from it. +// +// Routes via overlap-match against the current PK-range cache (split-aware). When +// the customer's FeedRange straddles a split, it's expanded into one queue entry +// per child so no events are missed at the boundary. 410/Gone responses with a +// PK-range substatus trigger a cache refresh and bounded retry. The continuation +// token is a multi-range composite; subsequent calls drain remaining ranges. +// +// Returns ErrFeedRangeUnresolved (wrapped) when the customer's FeedRange/token +// doesn't overlap any current physical range even after a forced refresh — a +// signal to re-derive FeedRanges from GetFeedRanges. +// +// Returns an error wrapping *azcore.ResponseError on persistent 410/Gone or any +// non-retryable HTTP error. func (c *ContainerClient) GetChangeFeed( ctx context.Context, options *ChangeFeedOptions, @@ -826,30 +836,6 @@ func (c *ContainerClient) GetChangeFeed( options = &ChangeFeedOptions{} } - if options.FeedRange == nil && options.Continuation != nil && *options.Continuation != "" { - var compositeToken compositeContinuationToken - if err := json.Unmarshal([]byte(*options.Continuation), &compositeToken); err == nil { - if len(compositeToken.Continuation) > 0 { - options.FeedRange = &FeedRange{ - MinInclusive: compositeToken.Continuation[0].MinInclusive, - MaxExclusive: compositeToken.Continuation[0].MaxExclusive, - } - } - } - } - - if options.FeedRange != nil { - return c.getChangeFeedForEPKRange(ctx, options.FeedRange, options) - } else { - return ChangeFeedResponse{}, fmt.Errorf("GetChangeFeed requires a FeedRange to be set in the options, or a continuation token that contains a composite continuation token") - } -} - -func (c *ContainerClient) getChangeFeedForEPKRange( - ctx context.Context, - feedRange *FeedRange, - options *ChangeFeedOptions, -) (ChangeFeedResponse, error) { var err error spanName, err := c.getSpanForItems(operationTypeRead) if err != nil { @@ -858,63 +844,22 @@ func (c *ContainerClient) getChangeFeedForEPKRange( ctx, endSpan := startSpan(ctx, spanName.name, c.database.client.internal.Tracer(), &spanName.options) defer func() { endSpan(err) }() - if options == nil { - options = &ChangeFeedOptions{} - } - - pkrResp, err := c.getPartitionKeyRanges(ctx, nil) - if err != nil { - return ChangeFeedResponse{}, err - } - partitionKeyRanges := pkrResp.PartitionKeyRanges - - var addHeaders func(*policy.Request) - headersPtr := options.toHeaders(partitionKeyRanges) - if headersPtr != nil { - headers := *headersPtr - addHeaders = func(r *policy.Request) { - for k, v := range headers { - r.Raw().Header.Set(k, v) - } - } - } - - h := headerOptionsOverride{ - priorityLevel: options.PriorityLevel, - throughputBucket: options.ThroughputBucket, - } - - operationContext := pipelineRequestOptions{ - resourceType: resourceTypeDocument, - resourceAddress: c.link, - headerOptionsOverride: &h, - } - - path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) + // Cross-container token guard. We only need the container's current + // ResourceID when a continuation token carries one to validate. Building + // the initial queue takes care of that lazily so the no-token path stays + // at one extra request (pk-ranges fetch). + token, partitionKeyRanges, err := c.buildChangeFeedInitialQueue(ctx, options) if err != nil { return ChangeFeedResponse{}, err } - azResponse, err := c.database.client.sendGetRequest( - path, - ctx, - operationContext, - nil, - addHeaders, - ) - if err != nil { - return ChangeFeedResponse{}, err - } - - response, err := newChangeFeedResponse(azResponse) - if err != nil { - return response, err - } - - response.FeedRange = feedRange - response.PopulateCompositeContinuationToken() - - return response, nil + // Capture into err so the deferred endSpan closure observes drain + // failures (410 budget exhaustion, send errors, refresh failures, + // serialization errors, ErrFeedRangeUnresolved) instead of recording + // success on every drain that actually failed. + var resp ChangeFeedResponse + resp, err = c.getChangeFeedForQueue(ctx, options, token, partitionKeyRanges) + return resp, err } func (c *ContainerClient) getRID(ctx context.Context) (string, error) { diff --git a/sdk/data/azcosmos/cosmos_container_change_feed.go b/sdk/data/azcosmos/cosmos_container_change_feed.go new file mode 100644 index 000000000000..c37dd1411a4d --- /dev/null +++ b/sdk/data/azcosmos/cosmos_container_change_feed.go @@ -0,0 +1,441 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/internal/epk" +) + +// buildChangeFeedInitialQueue assembles the queue this call's drain loop will +// operate on, validating any provided continuation token against the current +// container and resolving any provided FeedRange against the current PK +// range cache. Returns the fetched PK-range snapshot alongside the token so +// the drain loop can reuse it without re-fetching on every iteration; the +// 410-retry path re-fetches on its own. +// +// Returns (token, snapshot, nil) on success. Returns ErrFeedRangeUnresolved +// (wrapped) when no overlap exists even after the cache is fresh. +func (c *ContainerClient) buildChangeFeedInitialQueue( + ctx context.Context, + options *ChangeFeedOptions, +) (*compositeContinuationToken, []partitionKeyRange, error) { + // Path A: continuation token drives the queue. + if options.Continuation != nil && *options.Continuation != "" { + var compositeToken compositeContinuationToken + if err := json.Unmarshal([]byte(*options.Continuation), &compositeToken); err == nil && len(compositeToken.Continuation) > 0 { + // Reject cross-container token reuse loudly. Customers who hit this + // have either pasted the wrong token, dropped a container and + // recreated it under the same name, or fanned out a token to a + // different client. Continuing would route against the wrong map. + currentRID, ridErr := c.getContainerRID(ctx) + if ridErr != nil { + return nil, nil, ridErr + } + if compositeToken.ResourceID != "" && currentRID != "" && compositeToken.ResourceID != currentRID { + return nil, nil, fmt.Errorf( + "continuation token ResourceID %q does not match the current container's ResourceID %q; the token was issued for a different container", + compositeToken.ResourceID, currentRID, + ) + } + queue := append([]changeFeedRange(nil), compositeToken.Continuation...) + token := compositeToken + token.Continuation = queue + // Populate ResourceID at construction time so the cross-container + // guard remains meaningful even if the very first response is a + // 304 (no body parsed → response.ResourceID empty). + if token.ResourceID == "" { + token.ResourceID = currentRID + } + + // Fetch a PK-range snapshot for the drain loop. Reused so the + // loop doesn't issue an extra request per iteration. + pkrResp, err := c.getPartitionKeyRanges(ctx, nil) + if err != nil { + return nil, nil, err + } + return &token, pkrResp.PartitionKeyRanges, nil + } + // options.Continuation was supplied but is not a multi-range composite + // token. Only composite tokens issued by this SDK's GetChangeFeed are + // honored on resume; legacy raw-ETag continuation strings are NOT + // supported and are silently dropped here. The FeedRange path below + // will start fresh. + } + + // Path B: FeedRange drives the queue. + if options.FeedRange == nil { + return nil, nil, fmt.Errorf("GetChangeFeed requires a FeedRange to be set in the options, or a continuation token that contains a composite continuation token") + } + + children, pkrs, err := c.resolveFeedRangeToChildren(ctx, *options.FeedRange) + if err != nil { + return nil, nil, err + } + entries := buildChildQueueEntries(children, nil) + // Populate ResourceID at construction time so a token persisted from a + // 304-only first call still triggers the cross-container guard on resume. + currentRID, ridErr := c.getContainerRID(ctx) + if ridErr != nil { + return nil, nil, ridErr + } + token := compositeContinuationToken{ + Version: cosmosCompositeContinuationTokenVersion, + ResourceID: currentRID, + Continuation: entries, + } + return &token, pkrs, nil +} + +// resolveFeedRangeToChildren returns the routing-map ranges that overlap the +// given customer-supplied FeedRange. On no-overlap, performs a single forced +// refresh and retries; on still no overlap, returns ErrFeedRangeUnresolved. +// +// Also returns the PK-range snapshot it fetched, so the drain loop can reuse +// it for the rest of this GetChangeFeed call. +func (c *ContainerClient) resolveFeedRangeToChildren( + ctx context.Context, + feedRange FeedRange, +) ([]partitionKeyRange, []partitionKeyRange, error) { + pkrResp, err := c.getPartitionKeyRanges(ctx, nil) + if err != nil { + return nil, nil, err + } + + overlaps := overlappingPartitionKeyRanges(feedRange, pkrResp.PartitionKeyRanges) + if len(overlaps) > 0 { + return overlaps, pkrResp.PartitionKeyRanges, nil + } + + // No overlap on the cached map. Try a forced refresh once if a cache + // exists; if still no overlap, the customer's FeedRange genuinely doesn't + // apply to this container. + if c.database.client.getPKRangeCache() != nil { + if refreshErr := c.refreshPKRangeCache(ctx); refreshErr != nil { + return nil, nil, refreshErr + } + pkrResp, err = c.getPartitionKeyRanges(ctx, nil) + if err != nil { + return nil, nil, err + } + overlaps = overlappingPartitionKeyRanges(feedRange, pkrResp.PartitionKeyRanges) + if len(overlaps) > 0 { + return overlaps, pkrResp.PartitionKeyRanges, nil + } + } + + return nil, nil, &feedRangeUnresolvedError{feedRange: feedRange} +} + +// buildChildQueueEntries materializes [].changeFeedRange entries for each +// child range, copying the inheritETag pointer onto every child so no events +// are skipped at the split boundary. inheritETag may be nil for fresh ranges +// that have never been read. +func buildChildQueueEntries(children []partitionKeyRange, inheritETag *azcore.ETag) []changeFeedRange { + out := make([]changeFeedRange, 0, len(children)) + for _, ch := range children { + entry := changeFeedRange{ + MinInclusive: ch.MinInclusive, + MaxExclusive: ch.MaxExclusive, + } + if inheritETag != nil { + etagCopy := *inheritETag + entry.ContinuationToken = &etagCopy + } + out = append(out, entry) + } + return out +} + +// clampChildrenToParent narrows each child's [Min, Max) to the intersection +// with the parent head's [Min, Max), preserving the customer's original sub- +// range intent across split-expansion. Children that fall entirely outside +// the parent are dropped. Mirrors Java's +// FeedRangeCompositeContinuationImpl.createChildRanges clamping behavior. +func clampChildrenToParent(children []partitionKeyRange, parent changeFeedRange) []partitionKeyRange { + parentMin := parent.MinInclusive + parentMax := normalizeMaxBoundary(parent.MaxExclusive) + out := make([]partitionKeyRange, 0, len(children)) + for _, ch := range children { + childMin := ch.MinInclusive + childMax := normalizeMaxBoundary(ch.MaxExclusive) + // Intersection: max of mins, min of maxes. + newMin := childMin + if epk.CompareEPK(parentMin, childMin) > 0 { + newMin = parentMin + } + newMax := childMax + if epk.CompareEPK(parentMax, childMax) < 0 { + newMax = parentMax + } + // Discard children whose intersection with the parent is empty. + if epk.CompareEPK(newMin, newMax) >= 0 { + continue + } + clamped := ch + clamped.MinInclusive = newMin + clamped.MaxExclusive = newMax + out = append(out, clamped) + } + return out +} + +// getChangeFeedForQueue drains the queue, advancing on every response (200 or +// 304). On 200 with documents, returns immediately so the caller can process +// the page; on 304, rotates and tries the next entry until the original queue +// length is fully consumed (with budget bumps on splits). On 410, refreshes +// the cache, re-resolves the head, and retries — capped at maxPKRangeGoneRetries. +// +// partitionKeyRanges is the snapshot fetched once at the start of the call; +// the loop reuses it instead of re-fetching per iteration. The 410-retry path +// re-fetches and replaces the snapshot. +// +// RequestCharge is aggregated across all iterations so the returned response +// reports total RU consumed by the call (matching cosmos_container_read_many). +// +// On a 410-budget exhaustion mid-drain, the partial response (with the queue +// state rotated past every successfully-drained head) is returned alongside +// the error so callers can resume from where the drain failed instead of +// re-querying already-drained heads. +func (c *ContainerClient) getChangeFeedForQueue( + ctx context.Context, + options *ChangeFeedOptions, + token *compositeContinuationToken, + partitionKeyRanges []partitionKeyRange, +) (ChangeFeedResponse, error) { + if token == nil || len(token.Continuation) == 0 { + return ChangeFeedResponse{}, fmt.Errorf("GetChangeFeed has nothing to drain: no FeedRange and no continuation token entries") + } + + // Drain budget: how many rotations we'll perform before we give up and + // return an empty page so the caller can poll again. Starts at the queue + // length and grows whenever a split-expansion inserts children. + originalQueueLen := len(token.Continuation) + // Hard cap on the rotation budget — defends against bugs in overlap + // resolution (e.g., duplicate ranges in the snapshot) that would cause + // repeated split-expansion on the same head and an unbounded drain. + // Mirrors Java's FeedRangeCompositeContinuationImpl ~4×(N+1) defense. + maxQueueLen := 4 * (originalQueueLen + 1) + rotations := 0 + pkRangeGoneAttempts := 0 + + var lastResp ChangeFeedResponse + var totalRequestCharge float32 + + // finalize attaches the rotated continuation token and the aggregated + // request charge to the given response. Used both on success and on + // the 410-budget-exhausted partial-state return path. + finalize := func(r ChangeFeedResponse) (ChangeFeedResponse, error) { + serialized, serErr := serializeCompositeContinuationToken(token) + if serErr != nil { + return r, serErr + } + r.ContinuationToken = serialized + r.RequestCharge = totalRequestCharge + return r, nil + } + + for rotations < originalQueueLen { + // Honor caller cancellation between sub-requests so a long drain + // doesn't keep working past a deadline / explicit cancel. + select { + case <-ctx.Done(): + return ChangeFeedResponse{}, ctx.Err() + default: + } + + head := token.head() + if head == nil { + break + } + + // Resolve the head's EPK range to a single PK-range ID against the + // current routing-map snapshot. + headFeedRange := FeedRange{MinInclusive: head.MinInclusive, MaxExclusive: head.MaxExclusive} + overlaps := overlappingPartitionKeyRanges(headFeedRange, partitionKeyRanges) + if len(overlaps) == 0 { + // No overlap on the cached map. Force a refresh and re-fetch; if + // still no overlap, the head is unresolvable. + if c.database.client.getPKRangeCache() != nil { + if refreshErr := c.refreshPKRangeCache(ctx); refreshErr != nil { + return ChangeFeedResponse{}, refreshErr + } + } + pkrResp, err := c.getPartitionKeyRanges(ctx, nil) + if err != nil { + return ChangeFeedResponse{}, err + } + partitionKeyRanges = pkrResp.PartitionKeyRanges + overlaps = overlappingPartitionKeyRanges(headFeedRange, partitionKeyRanges) + if len(overlaps) == 0 { + return ChangeFeedResponse{}, &feedRangeUnresolvedError{feedRange: headFeedRange} + } + } + + var resolvedPKRangeID string + if len(overlaps) > 1 { + // Split-expansion. Replace the head with N children inheriting + // the head's ETag, and bump the rotation budget so newly-inserted + // children get visited in this call. Reset the 410 budget too — + // each newly-inserted child is a fresh physical head and deserves + // its own retry allowance. Children are clamped to the parent + // head's bounds to preserve the customer's original sub-range + // intent (matches Java's createChildRanges behavior). + clamped := clampChildrenToParent(overlaps, *head) + if len(clamped) == 0 { + // Defensive: every overlap fell outside the parent. Treat as + // unresolvable rather than spin in an empty loop. + return ChangeFeedResponse{}, &feedRangeUnresolvedError{feedRange: headFeedRange} + } + children := buildChildQueueEntries(clamped, head.ContinuationToken) + token.replaceHeadWithChildren(children) + originalQueueLen += len(children) - 1 + if originalQueueLen > maxQueueLen { + // Hit the safety cap. This shouldn't happen in practice + // (bounded by the service's physical-partition limit), but a + // bug in overlap resolution producing duplicate children + // would otherwise spin forever. Return what we have so far. + if lastResp.RawResponse != nil { + return finalize(lastResp) + } + return ChangeFeedResponse{}, fmt.Errorf("GetChangeFeed: drain queue exceeded safety cap of %d entries; suspected overlap-resolution bug", maxQueueLen) + } + pkRangeGoneAttempts = 0 + continue + } + resolvedPKRangeID = overlaps[0].ID + + headers, headerErr := options.buildRequestHeaders(*head, resolvedPKRangeID) + if headerErr != nil { + return ChangeFeedResponse{}, headerErr + } + + addHeaders := func(r *policy.Request) { + for k, v := range headers { + r.Raw().Header.Set(k, v) + } + } + + operationContext := pipelineRequestOptions{ + resourceType: resourceTypeDocument, + resourceAddress: c.link, + headerOptionsOverride: &headerOptionsOverride{ + priorityLevel: options.PriorityLevel, + throughputBucket: options.ThroughputBucket, + }, + } + path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) + if err != nil { + return ChangeFeedResponse{}, err + } + + azResponse, sendErr := c.database.client.sendGetRequest( + path, ctx, operationContext, nil, addHeaders, + ) + if sendErr != nil { + // 410/Gone with a PK-range substatus → refresh + retry. + if isPKRangeGoneResponseError(sendErr) { + if pkRangeGoneAttempts >= maxPKRangeGoneRetries { + // Surface partial drain progress alongside the error so + // the caller can resume from the failed head instead of + // re-querying already-drained heads. lastResp may be + // zero-value if no head succeeded yet — in that case the + // caller still sees the original token state. + if lastResp.RawResponse != nil { + partial, _ := finalize(lastResp) + return partial, sendErr + } + return ChangeFeedResponse{}, sendErr + } + pkRangeGoneAttempts++ + if refreshErr := c.refreshPKRangeCache(ctx); refreshErr != nil { + return ChangeFeedResponse{}, refreshErr + } + // Re-fetch the routing map after the cache was invalidated. + pkrResp, fetchErr := c.getPartitionKeyRanges(ctx, nil) + if fetchErr != nil { + return ChangeFeedResponse{}, fetchErr + } + partitionKeyRanges = pkrResp.PartitionKeyRanges + // Retry the same head against the refreshed snapshot. + continue + } + return ChangeFeedResponse{}, sendErr + } + + response, err := newChangeFeedResponse(azResponse) + if err != nil { + return response, err + } + + // Aggregate RU charge across every sub-request so the returned + // response reports the true total cost of the drain (single-iter + // callers are unaffected; multi-range drains stop under-reporting). + totalRequestCharge += response.RequestCharge + + // Capture the response body's _rid into the token's ResourceID on first + // successful response. This keeps the cross-container guard meaningful + // across resume — token-issued-by-this-container always carries the + // container's RID — and matches pre-F1 PopulateCompositeContinuationToken + // semantics that downstream tests rely on. + if token.ResourceID == "" && response.ResourceID != "" { + token.ResourceID = response.ResourceID + } + + // Always rotate the head with the freshly-issued ETag, regardless of + // status. This preserves drain progress even across 304s. + newETag := response.ETag + feedRangeForResp := &FeedRange{MinInclusive: head.MinInclusive, MaxExclusive: head.MaxExclusive} + token.advance(newETag) + rotations++ + // Head advanced to a new physical range; reset the 410 budget so the + // next head gets its own allowance instead of inheriting prior 410s. + pkRangeGoneAttempts = 0 + + response.FeedRange = feedRangeForResp + lastResp = response + + // 200 with a non-empty page → return immediately so the caller can + // process. The Documents-length belt covers the (shouldn't-happen) + // case where the server omits _count but still ships docs. + if response.RawResponse != nil && + response.RawResponse.StatusCode == http.StatusOK && + (response.Count > 0 || len(response.Documents) > 0) { + return finalize(response) + } + + // 304 (or 200 with zero documents) → keep draining the rest of the queue. + } + + // Whole queue drained without finding documents. Return the last (empty) + // response with the rotated continuation token so the caller knows the + // drain progressed and can poll again later. + if lastResp.RawResponse == nil { + // Nothing was issued (queue was empty). Synthesize an empty response. + return ChangeFeedResponse{}, nil + } + return finalize(lastResp) +} + +// serializeCompositeContinuationToken marshals the token as JSON for emission +// to the customer. Returns "" if the token is nil or has an empty queue. +func serializeCompositeContinuationToken(token *compositeContinuationToken) (string, error) { + if token == nil || len(token.Continuation) == 0 { + return "", nil + } + if token.Version == 0 { + token.Version = cosmosCompositeContinuationTokenVersion + } + b, err := json.Marshal(token) + if err != nil { + return "", err + } + return string(b), nil +} diff --git a/sdk/data/azcosmos/cosmos_container_test.go b/sdk/data/azcosmos/cosmos_container_test.go index 5760351de29f..5e0e3d1adcba 100644 --- a/sdk/data/azcosmos/cosmos_container_test.go +++ b/sdk/data/azcosmos/cosmos_container_test.go @@ -826,11 +826,10 @@ func TestContainerReadPartitionKeyRangesEmpty(t *testing.T) { func TestContainerGetChangeFeedWithStartFrom(t *testing.T) { changeFeedBody := []byte( - `{"_rid":"test-rid", + `{"_rid":"testRID", "Documents":[{"id":"doc1"},{"id":"doc2"}], "_count":2}`) srv, close := mock.NewTLSServer() - defaultEndpoint, _ := url.Parse(srv.URL()) defer close() srv.SetResponse( mock.WithBody(changeFeedBody), @@ -840,9 +839,8 @@ func TestContainerGetChangeFeedWithStartFrom(t *testing.T) { mock.WithStatusCode(200)) verifier := pipelineVerifier{} - internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) - gem := &globalEndpointManager{preferredLocations: []string{}} - client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{&verifier}, ranges) database, _ := newDatabase("databaseId", client) container, _ := newContainer("containerId", database) @@ -861,8 +859,8 @@ func TestContainerGetChangeFeedWithStartFrom(t *testing.T) { if err != nil { t.Fatalf("GetChangeFeed returned error: %v", err) } - if resp.ResourceID != "test-rid" { - t.Errorf("Expected ResourceID 'test-rid', got %v", resp.ResourceID) + if resp.ResourceID != "testRID" { + t.Errorf("Expected ResourceID 'testRID', got %v", resp.ResourceID) } if resp.Count != 2 { t.Errorf("Expected Count 2, got %v", resp.Count) @@ -871,11 +869,12 @@ func TestContainerGetChangeFeedWithStartFrom(t *testing.T) { t.Errorf("Expected 2 documents, got %v", len(resp.Documents)) } - if len(verifier.requests) != 2 { - t.Fatalf("Expected 2 requests, got %d", len(verifier.requests)) + // With caches pre-wired, only the change-feed request hits the wire. + if len(verifier.requests) != 1 { + t.Fatalf("Expected 1 request, got %d", len(verifier.requests)) } - request := verifier.requests[1] + request := verifier.requests[0] ifModifiedSinceHeader := request.headers.Get(cosmosHeaderIfModifiedSince) expectedIfModifiedSince := modifiedSince.Format(time.RFC1123) @@ -893,7 +892,7 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { // First response: All documents when using beginning of time filter allDocumentsBody := []byte(`{ - "_rid": "test-rid", + "_rid": "testRID", "Documents": [ {"id": "doc1", "_ts": 1730000000}, {"id": "doc2", "_ts": 1735000000}, @@ -904,7 +903,7 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { // Second response: Only documents after the filter time filteredDocumentsBody := []byte(`{ - "_rid": "test-rid", + "_rid": "testRID", "Documents": [ {"id": "doc3", "_ts": 1740000000} ], @@ -912,7 +911,6 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { }`) srv, close := mock.NewTLSServer() - defaultEndpoint, _ := url.Parse(srv.URL()) defer close() // Set up mock responses @@ -924,9 +922,8 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { mock.WithStatusCode(200)) verifier := pipelineVerifier{} - internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) - gem := &globalEndpointManager{preferredLocations: []string{}} - client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{&verifier}, ranges) database, _ := newDatabase("databaseId", client) container, _ := newContainer("containerId", database) @@ -970,11 +967,12 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { } } - if len(verifier.requests) < 2 { - t.Fatalf("Expected at least 2 requests, got %d", len(verifier.requests)) + // With caches pre-wired, only the change-feed request hits the wire. + if len(verifier.requests) < 1 { + t.Fatalf("Expected at least 1 request, got %d", len(verifier.requests)) } - firstRequest := verifier.requests[1] + firstRequest := verifier.requests[0] firstIfModifiedSinceHeader := firstRequest.headers.Get(cosmosHeaderIfModifiedSince) firstExpectedIfModifiedSince := beginningOfTime.Format(time.RFC1123) @@ -994,8 +992,7 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { mock.WithStatusCode(200)) verifier = pipelineVerifier{} - internalClient, _ = azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) - client = &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem} + client = createChangeFeedTestClient(t, srv, []policy.Policy{&verifier}, ranges) database, _ = newDatabase("databaseId", client) container, _ = newContainer("containerId", database) @@ -1026,11 +1023,11 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { t.Errorf("Expected filtered document to have ID 'doc3', got '%s'", filteredDoc["id"]) } - if len(verifier.requests) < 2 { - t.Fatalf("Expected at least 2 requests in second test, got %d", len(verifier.requests)) + if len(verifier.requests) < 1 { + t.Fatalf("Expected at least 1 request in second test, got %d", len(verifier.requests)) } - secondRequest := verifier.requests[1] + secondRequest := verifier.requests[0] secondIfModifiedSinceHeader := secondRequest.headers.Get(cosmosHeaderIfModifiedSince) secondExpectedIfModifiedSince := midpointTime.Format(time.RFC1123) @@ -1044,34 +1041,15 @@ func TestContainerGetChangeFeedWithStartFromFiltering(t *testing.T) { func TestContainerGetChangeFeedForEPKRange(t *testing.T) { changeFeedBody := []byte(`{ - "_rid": "test-resource-id", + "_rid": "testRID", "Documents": [{"id": "doc1"}, {"id": "doc2"}], "_count": 2 }`) - pkRangesBody := []byte(`{ - "_rid": "test-resource-id", - "PartitionKeyRanges": [{ - "_rid": "range-rid", - "id": "0", - "minInclusive": "00", - "maxExclusive": "FF" - }], - "_count": 1 - }`) - srv, close := mock.NewTLSServer() - defaultEndpoint, _ := url.Parse(srv.URL()) defer close() - // First response should be for the partition key ranges request - srv.AppendResponse( - mock.WithBody(pkRangesBody), - mock.WithHeader(cosmosHeaderActivityId, "pkRangesActivityId"), - mock.WithHeader(cosmosHeaderRequestCharge, "1.0"), - mock.WithStatusCode(200)) - - // Second response should be for the change feed request + // Only the change-feed request hits the wire — caches are pre-wired. srv.AppendResponse( mock.WithBody(changeFeedBody), mock.WithHeader(cosmosHeaderEtag, "\"etag-12345\""), @@ -1080,9 +1058,8 @@ func TestContainerGetChangeFeedForEPKRange(t *testing.T) { mock.WithStatusCode(200)) verifier := pipelineVerifier{} - internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) - gem := &globalEndpointManager{preferredLocations: []string{}} - client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "00", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{&verifier}, ranges) database, _ := newDatabase("databaseId", client) container, _ := newContainer("containerId", database) @@ -1100,8 +1077,8 @@ func TestContainerGetChangeFeedForEPKRange(t *testing.T) { t.Fatalf("GetChangeFeedForEPKRange failed: %v", err) } - if resp.ResourceID != "test-resource-id" { - t.Errorf("unexpected ResourceID: got %q, want %q", resp.ResourceID, "test-resource-id") + if resp.ResourceID != "testRID" { + t.Errorf("unexpected ResourceID: got %q, want %q", resp.ResourceID, "testRID") } if resp.Count != 2 { @@ -1112,25 +1089,13 @@ func TestContainerGetChangeFeedForEPKRange(t *testing.T) { t.Errorf("unexpected number of Documents: got %d, want 2", len(resp.Documents)) } - if len(verifier.requests) != 2 { - t.Fatalf("Expected exactly 2 requests (partition key ranges and change feed), got %d", len(verifier.requests)) - } - - // First request should be to get partition key ranges - pkRangesRequest := verifier.requests[0] - if !strings.Contains(pkRangesRequest.url.Path, "pkranges") { - t.Errorf("Expected first request to be for partition key ranges, got URL path: %s", pkRangesRequest.url.Path) - } - expectedPkRangesPath := "/dbs/databaseId/colls/containerId/pkranges" - if !strings.Contains(pkRangesRequest.url.Path, expectedPkRangesPath) { - t.Errorf("Expected partition key ranges path to contain %s, got %s", - expectedPkRangesPath, pkRangesRequest.url.Path) + if len(verifier.requests) != 1 { + t.Fatalf("Expected exactly 1 request (change feed; pkranges served from cache), got %d", len(verifier.requests)) } - // Second request should be the change feed request - changeFeedRequest := verifier.requests[1] + changeFeedRequest := verifier.requests[0] if !strings.Contains(changeFeedRequest.url.Path, "/docs") { - t.Errorf("Expected second request to be for documents, got URL path: %s", changeFeedRequest.url.Path) + t.Errorf("Expected request to be for documents, got URL path: %s", changeFeedRequest.url.Path) } pkRangeHeader := changeFeedRequest.headers.Get(headerXmsDocumentDbPartitionKeyRangeId) @@ -1159,9 +1124,11 @@ func TestContainerGetChangeFeedForEPKRange(t *testing.T) { compositeToken.Version, cosmosCompositeContinuationTokenVersion) } - if compositeToken.ResourceID != "test-resource-id" { + // Token's ResourceID is now populated at construction time from the + // container's RID ("testRID" per createChangeFeedTestClient). + if compositeToken.ResourceID != "testRID" { t.Errorf("unexpected ResourceID in composite token: got %q, want %q", - compositeToken.ResourceID, "test-resource-id") + compositeToken.ResourceID, "testRID") } if len(compositeToken.Continuation) != 1 { @@ -1187,28 +1154,6 @@ func TestContainerGetChangeFeedForEPKRange(t *testing.T) { t.Errorf("unexpected ContinuationToken: got %q, want %q", *compositeToken.Continuation[0].ContinuationToken, "\"etag-12345\"") } - - // Now test using the continuation token in a subsequent request - options2 := &ChangeFeedOptions{ - MaxItemCount: 10, - Continuation: &resp.ContinuationToken, - } - - headers := options2.toHeaders(nil) - if headers == nil { - t.Fatal("expected headers to be non-nil") - } - - h := *headers - if h[headerIfNoneMatch] != "\"etag-12345\"" { - t.Errorf("unexpected IfNoneMatch header: got %q, want %q", - h[headerIfNoneMatch], "\"etag-12345\"") - } - - if h[cosmosHeaderChangeFeed] != cosmosHeaderValuesChangeFeed { - t.Errorf("unexpected ChangeFeed header in continuation request: got %q, want %q", - h[cosmosHeaderChangeFeed], cosmosHeaderValuesChangeFeed) - } } func TestCreateItemPriorityAndThroughputBucketHeaders(t *testing.T) { @@ -1557,31 +1502,15 @@ func TestTransactionalBatchPriorityAndThroughputBucketHeaders(t *testing.T) { func TestChangeFeedPriorityAndThroughputBucketHeaders(t *testing.T) { changeFeedBody := []byte(`{ - "_rid": "test-resource-id", + "_rid": "testRID", "Documents": [{"id": "doc1"}], "_count": 1 }`) - pkRangesBody := []byte(`{ - "_rid": "test-resource-id", - "PartitionKeyRanges": [{ - "_rid": "range-rid", - "id": "0", - "minInclusive": "00", - "maxExclusive": "FF" - }], - "_count": 1 - }`) - srv, close := mock.NewTLSServer() - defaultEndpoint, _ := url.Parse(srv.URL()) defer close() - srv.AppendResponse( - mock.WithBody(pkRangesBody), - mock.WithHeader(cosmosHeaderActivityId, "pkRangesActivityId"), - mock.WithHeader(cosmosHeaderRequestCharge, "1.0"), - mock.WithStatusCode(200)) + // Only the change-feed request hits the wire — caches are pre-wired. srv.AppendResponse( mock.WithBody(changeFeedBody), mock.WithHeader(cosmosHeaderEtag, "\"etag-12345\""), @@ -1591,10 +1520,8 @@ func TestChangeFeedPriorityAndThroughputBucketHeaders(t *testing.T) { verifier := pipelineVerifier{} headerPolicy := &headerPolicies{} - - internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{headerPolicy, &verifier}}, &policy.ClientOptions{Transport: srv}) - gem := &globalEndpointManager{preferredLocations: []string{}} - client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem} + ranges := []partitionKeyRange{{ID: "0", MinInclusive: "00", MaxExclusive: "FF", ResourceID: "testRID"}} + client := createChangeFeedTestClient(t, srv, []policy.Policy{headerPolicy, &verifier}, ranges) database, _ := newDatabase("databaseId", client) container, _ := newContainer("containerId", database) @@ -1614,8 +1541,7 @@ func TestChangeFeedPriorityAndThroughputBucketHeaders(t *testing.T) { t.Fatalf("GetChangeFeed failed: %v", err) } - // The second request is the change feed request (first is pk ranges) - h := verifier.requests[1].headers + h := verifier.requests[0].headers if h.Get(cosmosHeaderPriorityLevel) != "High" { t.Errorf("Expected priority level header to be High, got %v", h.Get(cosmosHeaderPriorityLevel)) } diff --git a/sdk/data/azcosmos/cosmos_feed_range.go b/sdk/data/azcosmos/cosmos_feed_range.go index 8658dc577a83..8e8044a135e2 100644 --- a/sdk/data/azcosmos/cosmos_feed_range.go +++ b/sdk/data/azcosmos/cosmos_feed_range.go @@ -3,7 +3,37 @@ package azcosmos -import "fmt" +import ( + "errors" + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/internal/epk" +) + +// ErrFeedRangeUnresolved is returned when a customer-supplied FeedRange does not +// overlap any current physical partition key range, even after a forced cache +// refresh. Callers can use errors.Is to detect this and fall back to +// re-deriving FeedRanges from GetFeedRanges. +// +// This typically indicates one of: +// - The FeedRange was constructed for a different container. +// - The container was deleted and recreated with different partitioning. +// - The FeedRange boundaries are malformed (outside [00, FF) or empty range). +var ErrFeedRangeUnresolved = errors.New("feed range did not overlap any current partition key range") + +// feedRangeUnresolvedError wraps ErrFeedRangeUnresolved with diagnostic detail +// about the unresolvable FeedRange so customers can identify which range failed. +type feedRangeUnresolvedError struct { + feedRange FeedRange +} + +func (e *feedRangeUnresolvedError) Error() string { + return fmt.Sprintf("%s: [%s, %s)", ErrFeedRangeUnresolved.Error(), e.feedRange.MinInclusive, e.feedRange.MaxExclusive) +} + +func (e *feedRangeUnresolvedError) Unwrap() error { + return ErrFeedRangeUnresolved +} // FeedRange represents a range of partition key values for a Cosmos container. // It is used to identify a specific range of documents for change feed processing. @@ -22,13 +52,56 @@ func NewFeedRange(minInclusive, maxExclusive string) FeedRange { } } -// findPartitionKeyRangeID finds the partition key range ID that matches the given FeedRange. -// Returns the ID if found, or an error if no match exists. -func findPartitionKeyRangeID(feedRange FeedRange, partitionKeyRanges []partitionKeyRange) (string, error) { +// overlappingPartitionKeyRanges returns the subset of partitionKeyRanges whose +// boundaries overlap the given feedRange, preserving input order. Returns nil +// on no overlap (no error). +// +// Note: O(n) linear scan. For routing-map-backed lookups, prefer +// collectionRoutingMap.getOverlappingRanges (binary search). This helper exists +// for paths that operate on a flat snapshot returned by getPartitionKeyRanges. +func overlappingPartitionKeyRanges(feedRange FeedRange, partitionKeyRanges []partitionKeyRange) []partitionKeyRange { + if len(partitionKeyRanges) == 0 { + return nil + } + + feedMin := feedRange.MinInclusive + feedMax := normalizeMaxBoundary(feedRange.MaxExclusive) + if epk.CompareEPK(feedMin, feedMax) >= 0 { + return nil + } + + out := make([]partitionKeyRange, 0, 2) for _, pkr := range partitionKeyRanges { - if feedRange.MinInclusive == pkr.MinInclusive && feedRange.MaxExclusive == pkr.MaxExclusive { - return pkr.ID, nil + if rangesOverlap(feedMin, feedMax, pkr.MinInclusive, normalizeMaxBoundary(pkr.MaxExclusive)) { + out = append(out, pkr) } } - return "", fmt.Errorf("no matching partition key range found for feed range [%s, %s)", feedRange.MinInclusive, feedRange.MaxExclusive) + return out +} + +// rangesOverlap reports whether [aMin, aMax) intersects [bMin, bMax). +// All four boundaries must be normalized hex EPK strings (with "FF" used +// for the upper sentinel rather than ""); rangesOverlap does NOT treat +// empty strings as open boundaries. +func rangesOverlap(aMin, aMax, bMin, bMax string) bool { + // Standard half-open interval overlap test: + // intersection is empty iff aMax <= bMin or bMax <= aMin. + if epk.CompareEPK(aMax, bMin) <= 0 { + return false + } + if epk.CompareEPK(bMax, aMin) <= 0 { + return false + } + return true +} + +// normalizeMaxBoundary converts the open-top "" sentinel to "FF" so length-aware +// EPK comparisons against finite ranges produce sensible answers. PK range +// snapshots from the service usually use "FF" but customer-supplied FeedRanges +// often use "". Min boundaries are NOT normalized — "" already sorts lowest. +func normalizeMaxBoundary(maxExclusive string) string { + if maxExclusive == "" { + return "FF" + } + return maxExclusive } diff --git a/sdk/data/azcosmos/cosmos_feed_range_test.go b/sdk/data/azcosmos/cosmos_feed_range_test.go index 8f6f82f7f8ba..ae0582369023 100644 --- a/sdk/data/azcosmos/cosmos_feed_range_test.go +++ b/sdk/data/azcosmos/cosmos_feed_range_test.go @@ -234,3 +234,147 @@ func TestContainerGetFeedRanges_UsesCache(t *testing.T) { require.Equal(t, requestsAfterFirstCall, srv.Requests(), "second call should make 0 HTTP requests (cache hit)") } + +// TestNormalizeMaxBoundary verifies the open-top "" sentinel becomes "FF", +// matching the Cosmos convention used everywhere routing math is performed. +// Min boundaries are NOT normalized — "" already sorts lowest. +func TestNormalizeMaxBoundary(t *testing.T) { + require.Equal(t, "FF", normalizeMaxBoundary(""), "empty Max must normalize to FF") + require.Equal(t, "FF", normalizeMaxBoundary("FF"), "explicit FF must round-trip") + require.Equal(t, "80", normalizeMaxBoundary("80"), "non-empty Max must pass through unchanged") + require.Equal(t, "00", normalizeMaxBoundary("00")) + require.Equal(t, "AABBCCDD", normalizeMaxBoundary("AABBCCDD"), "long EPK must pass through unchanged") +} + +// TestRangesOverlap_TableDriven exercises the half-open interval overlap +// predicate. All four boundaries must be normalized hex EPK strings. +func TestRangesOverlap_TableDriven(t *testing.T) { + cases := []struct { + name string + aMin, aMax, bMin, bMax string + want bool + }{ + {"identical", "10", "20", "10", "20", true}, + {"a-contains-b", "00", "FF", "20", "30", true}, + {"b-contains-a", "20", "30", "00", "FF", true}, + {"a-overlaps-b-left", "00", "30", "20", "FF", true}, + {"a-overlaps-b-right", "20", "FF", "00", "30", true}, + {"adjacent-a-then-b", "00", "20", "20", "FF", false}, + {"adjacent-b-then-a", "20", "FF", "00", "20", false}, + {"disjoint-a-below", "00", "10", "20", "30", false}, + {"disjoint-a-above", "30", "FF", "00", "20", false}, + {"single-byte-overlap", "10", "11", "10", "FF", true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.want, rangesOverlap(c.aMin, c.aMax, c.bMin, c.bMax)) + }) + } +} + +// TestOverlappingPartitionKeyRanges_TableDriven covers the boundary +// behaviors of overlap-resolution against a snapshot routing map. This is +// the single most critical helper in the F1 fix — it replaces the exact- +// match findPartitionKeyRangeID that was the original-bug source. +func TestOverlappingPartitionKeyRanges_TableDriven(t *testing.T) { + threeRanges := []partitionKeyRange{ + {ID: "0", MinInclusive: "", MaxExclusive: "55"}, + {ID: "1", MinInclusive: "55", MaxExclusive: "AA"}, + {ID: "2", MinInclusive: "AA", MaxExclusive: "FF"}, + } + + cases := []struct { + name string + feed FeedRange + ranges []partitionKeyRange + wantIDs []string + }{ + { + name: "exact-match-single-range", + feed: FeedRange{MinInclusive: "55", MaxExclusive: "AA"}, + ranges: threeRanges, + wantIDs: []string{"1"}, + }, + { + name: "spans-all-three-ranges", + feed: FeedRange{MinInclusive: "", MaxExclusive: "FF"}, + ranges: threeRanges, + wantIDs: []string{"0", "1", "2"}, + }, + { + name: "spans-all-three-with-empty-max", + feed: FeedRange{MinInclusive: "", MaxExclusive: ""}, + ranges: threeRanges, + wantIDs: []string{"0", "1", "2"}, + }, + { + name: "strict-sub-range-of-one-physical", + feed: FeedRange{MinInclusive: "20", MaxExclusive: "40"}, + ranges: threeRanges, + wantIDs: []string{"0"}, + }, + { + name: "straddles-two-ranges", + feed: FeedRange{MinInclusive: "30", MaxExclusive: "70"}, + ranges: threeRanges, + wantIDs: []string{"0", "1"}, + }, + { + name: "preserves-input-order", + feed: FeedRange{MinInclusive: "10", MaxExclusive: "C0"}, + ranges: threeRanges, + wantIDs: []string{"0", "1", "2"}, + }, + { + name: "empty-snapshot-returns-nil", + feed: FeedRange{MinInclusive: "", MaxExclusive: "FF"}, + ranges: nil, + wantIDs: nil, + }, + { + name: "inverted-feed-range-returns-nil", + feed: FeedRange{MinInclusive: "FF", MaxExclusive: "00"}, + ranges: threeRanges, + wantIDs: nil, + }, + { + name: "equal-bounds-returns-nil", + feed: FeedRange{MinInclusive: "55", MaxExclusive: "55"}, + ranges: threeRanges, + wantIDs: nil, + }, + { + name: "no-overlap-with-non-empty-snapshot", + feed: FeedRange{MinInclusive: "10", MaxExclusive: "20"}, + ranges: []partitionKeyRange{ + {ID: "x", MinInclusive: "30", MaxExclusive: "40"}, + {ID: "y", MinInclusive: "60", MaxExclusive: "70"}, + }, + wantIDs: nil, + }, + { + name: "boundary-touch-is-not-overlap", + feed: FeedRange{MinInclusive: "55", MaxExclusive: "AA"}, + ranges: []partitionKeyRange{ + {ID: "left", MinInclusive: "00", MaxExclusive: "55"}, // touches but doesn't overlap + {ID: "match", MinInclusive: "55", MaxExclusive: "AA"}, + {ID: "right", MinInclusive: "AA", MaxExclusive: "FF"}, // touches but doesn't overlap + }, + wantIDs: []string{"match"}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + out := overlappingPartitionKeyRanges(c.feed, c.ranges) + gotIDs := make([]string, 0, len(out)) + for _, r := range out { + gotIDs = append(gotIDs, r.ID) + } + if len(c.wantIDs) == 0 { + require.Empty(t, gotIDs) + } else { + require.Equal(t, c.wantIDs, gotIDs) + } + }) + } +} diff --git a/sdk/data/azcosmos/emulator_cosmos_change_feed_split_test.go b/sdk/data/azcosmos/emulator_cosmos_change_feed_split_test.go new file mode 100644 index 000000000000..a351c048c475 --- /dev/null +++ b/sdk/data/azcosmos/emulator_cosmos_change_feed_split_test.go @@ -0,0 +1,186 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestEmulatorChangeFeed_F1_SubRangeOfPhysicalRange exercises the original +// F1 bug pattern: a customer-supplied FeedRange that is a STRICT SUB-RANGE +// of a physical PK range (i.e., does not exact-match any pkrange boundary). +// Pre-F1 this returned silent no-data via the toHeaders exact-match path; +// post-F1 the overlap-resolution upstream finds the containing physical +// range and routes the request to it. +func TestEmulatorChangeFeed_F1_SubRangeOfPhysicalRange(t *testing.T) { + emulatorTests := newEmulatorTests(t) + client := emulatorTests.getClient(t, newSpanValidator(t, &spanMatcher{ExpectedSpans: []string{}})) + + database := emulatorTests.createDatabase(t, context.TODO(), client, "cfF1SubRange") + defer emulatorTests.deleteDatabase(t, context.TODO(), database) + + properties := ContainerProperties{ + ID: "cf-f1-sub", + PartitionKeyDefinition: PartitionKeyDefinition{Paths: []string{"/pk"}}, + } + throughput := NewManualThroughputProperties(400) + _, err := database.CreateContainer(context.TODO(), properties, &CreateContainerOptions{ThroughputProperties: &throughput}) + require.NoError(t, err) + + container, _ := database.NewContainer("cf-f1-sub") + + // Insert a few items + for i := 0; i < 5; i++ { + pkv := fmt.Sprintf("pk%d", i) + doc := map[string]interface{}{"id": fmt.Sprintf("item%d", i), "pk": pkv} + b, _ := json.Marshal(doc) + _, err := container.CreateItem(context.TODO(), NewPartitionKeyString(pkv), b, nil) + require.NoError(t, err) + } + + // Wait for change feed propagation. + time.Sleep(2 * time.Second) + + // FeedRange that is a strict sub-range of the typical "" -> "FF" + // container partition. Pre-F1 this did not exact-match any physical + // range and the request was silently dropped; post-F1 it resolves via + // overlap-match and the page is returned. + subRange := &FeedRange{MinInclusive: "10", MaxExclusive: "80"} + resp, err := container.GetChangeFeed(context.TODO(), &ChangeFeedOptions{ + FeedRange: subRange, + MaxItemCount: 10, + }) + require.NoError(t, err, "post-F1 sub-range FeedRange must resolve via overlap-match") + // The request must have been issued and a continuation token persisted, + // even if the page had zero results in this physical range. + require.NotEmpty(t, resp.ContinuationToken, "continuation token must be populated") +} + +// TestEmulatorChangeFeed_F1_WideRangeMultiDrain inserts items across a +// container and reads the change feed using a wide FeedRange that overlaps +// every physical range. Verifies the composite continuation token is well- +// formed and that pagination across calls eventually drains every range +// (the multi-range queue path). +func TestEmulatorChangeFeed_F1_WideRangeMultiDrain(t *testing.T) { + emulatorTests := newEmulatorTests(t) + client := emulatorTests.getClient(t, newSpanValidator(t, &spanMatcher{ExpectedSpans: []string{}})) + + database := emulatorTests.createDatabase(t, context.TODO(), client, "cfF1WideDrain") + defer emulatorTests.deleteDatabase(t, context.TODO(), database) + + properties := ContainerProperties{ + ID: "cf-f1-wide", + PartitionKeyDefinition: PartitionKeyDefinition{Paths: []string{"/pk"}}, + } + // Higher throughput → emulator may allocate multiple physical PK ranges. + throughput := NewManualThroughputProperties(10000) + _, err := database.CreateContainer(context.TODO(), properties, &CreateContainerOptions{ThroughputProperties: &throughput}) + require.NoError(t, err) + + container, _ := database.NewContainer("cf-f1-wide") + + const totalItems = 50 + expectedIDs := make(map[string]bool, totalItems) + for i := 0; i < totalItems; i++ { + pkv := fmt.Sprintf("pk%d", i) + id := fmt.Sprintf("item%d", i) + expectedIDs[id] = true + doc := map[string]interface{}{"id": id, "pk": pkv, "i": i} + b, _ := json.Marshal(doc) + _, err := container.CreateItem(context.TODO(), NewPartitionKeyString(pkv), b, nil) + require.NoError(t, err) + } + + time.Sleep(3 * time.Second) + + // Wide FeedRange that overlaps every physical range. + wide := &FeedRange{MinInclusive: "", MaxExclusive: "FF"} + + seen := make(map[string]bool, totalItems) + var token *string + for iter := 0; iter < 20; iter++ { // safety cap + opts := &ChangeFeedOptions{FeedRange: wide, MaxItemCount: 10, Continuation: token} + resp, err := container.GetChangeFeed(context.TODO(), opts) + require.NoError(t, err) + + // Verify token shape. + var ct compositeContinuationToken + require.NoError(t, json.Unmarshal([]byte(resp.ContinuationToken), &ct)) + require.Equal(t, cosmosCompositeContinuationTokenVersion, ct.Version) + require.GreaterOrEqual(t, len(ct.Continuation), 1, "composite token must carry at least one range") + + for _, raw := range resp.Documents { + var d map[string]interface{} + require.NoError(t, json.Unmarshal(raw, &d)) + if id, ok := d["id"].(string); ok { + seen[id] = true + } + } + + // 304 with no docs and an unchanged token signals drain complete. + if resp.Count == 0 && len(seen) >= totalItems { + break + } + // Loop forward with the rotated/composite token. + ctTok := resp.ContinuationToken + token = &ctTok + } + + for id := range expectedIDs { + require.True(t, seen[id], "expected to drain item %s across all ranges", id) + } +} + +// TestEmulatorChangeFeed_F1_CrossContainerTokenRejected verifies the +// cross-container guard: a continuation token whose embedded ResourceID +// belongs to a different container must be rejected loudly rather than +// misrouted against the wrong routing map. +func TestEmulatorChangeFeed_F1_CrossContainerTokenRejected(t *testing.T) { + emulatorTests := newEmulatorTests(t) + client := emulatorTests.getClient(t, newSpanValidator(t, &spanMatcher{ExpectedSpans: []string{}})) + + database := emulatorTests.createDatabase(t, context.TODO(), client, "cfF1XContainer") + defer emulatorTests.deleteDatabase(t, context.TODO(), database) + + throughput := NewManualThroughputProperties(400) + _, err := database.CreateContainer(context.TODO(), ContainerProperties{ + ID: "container-a", + PartitionKeyDefinition: PartitionKeyDefinition{Paths: []string{"/pk"}}, + }, &CreateContainerOptions{ThroughputProperties: &throughput}) + require.NoError(t, err) + _, err = database.CreateContainer(context.TODO(), ContainerProperties{ + ID: "container-b", + PartitionKeyDefinition: PartitionKeyDefinition{Paths: []string{"/pk"}}, + }, &CreateContainerOptions{ThroughputProperties: &throughput}) + require.NoError(t, err) + + containerA, _ := database.NewContainer("container-a") + containerB, _ := database.NewContainer("container-b") + + // Seed container-a so the change feed has a real token to issue. + doc := map[string]interface{}{"id": "a-item", "pk": "pk-a"} + b, _ := json.Marshal(doc) + _, err = containerA.CreateItem(context.TODO(), NewPartitionKeyString("pk-a"), b, nil) + require.NoError(t, err) + time.Sleep(2 * time.Second) + + respA, err := containerA.GetChangeFeed(context.TODO(), &ChangeFeedOptions{ + FeedRange: &FeedRange{MinInclusive: "", MaxExclusive: "FF"}, + }) + require.NoError(t, err) + require.NotEmpty(t, respA.ContinuationToken) + + // Hand container-a's token to container-b → must be rejected. + tokenA := respA.ContinuationToken + _, err = containerB.GetChangeFeed(context.TODO(), &ChangeFeedOptions{ + Continuation: &tokenA, + }) + require.Error(t, err, "cross-container continuation token must be rejected") +}