Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

### Bugs Fixed

* Fixed `GetChangeFeed` to survive partition splits: customer-supplied `FeedRange`s are now overlap-matched against the routing map, `410/Gone` triggers a cache refresh and bounded retry, split parents expand into per-child queue entries (inheriting the parent's ETag), and the continuation token persists multi-range state across calls. Continuation tokens are guarded against cross-container reuse. See [PR 26768](https://github.com/Azure/azure-sdk-for-go/pull/26768).
* Fixed V2 partition key routing: the top 2 bits of the first EPK byte are now masked to stay within the partition key range space [0x00, 0x3F]. Previously, items whose V2 hash started with a byte >= 0x40 could fail routing in ReadMany because the EPK lexicographically exceeded the "FF" range sentinel. See [PR 26723](https://github.com/Azure/azure-sdk-for-go/pull/26723)
* Fixed error handling for partition key range calls which would previously cause panics on any error. See [PR 26723](https://github.com/Azure/azure-sdk-for-go/pull/26723)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package azcosmos

import "github.com/Azure/azure-sdk-for-go/sdk/azcore"

// Version 1 is the initial version of the composite continuation token.
const cosmosCompositeContinuationTokenVersion = 1

Expand All @@ -25,3 +27,51 @@ func newCompositeContinuationToken(resourceID string, continuation []changeFeedR
Continuation: continuation,
}
}

// head returns a pointer to the head queue entry, or nil if the queue is empty.
// Callers MUST NOT mutate the returned entry; use replaceHeadWithChildren or
// advance instead, which preserve queue invariants.
func (t *compositeContinuationToken) head() *changeFeedRange {
if t == nil || len(t.Continuation) == 0 {
return nil
}
return &t.Continuation[0]
}

// advance rotates the head entry to the tail of the queue, updating its
// ContinuationToken to the freshly-returned ETag from the just-completed
// request. This is the FIFO rotation used after every successful 200 (or
// 304 — both progress the per-range ETag).
//
// No-op if the queue is empty. If newETag is empty, the head's existing
// ContinuationToken is preserved (the service didn't issue a new ETag,
// e.g., on a 304 without one).
func (t *compositeContinuationToken) advance(newETag azcore.ETag) {
if t == nil || len(t.Continuation) == 0 {
return
}
head := t.Continuation[0]
if newETag != "" {
etagCopy := newETag
head.ContinuationToken = &etagCopy
}

// Rotate: drop head, append to tail.
t.Continuation = append(t.Continuation[1:], head)
}

// replaceHeadWithChildren replaces the head queue entry with the provided
// child entries, preserving the order of the children at the front of the
// queue. Used when a 410/Gone refresh reveals a split (parent → N children)
// or when initial overlap resolution returns multiple children for one
// customer-supplied FeedRange.
//
// No-op if children is empty (degenerate split that produces no overlap).
// No-op if the queue is empty (caller should not have called this).
func (t *compositeContinuationToken) replaceHeadWithChildren(children []changeFeedRange) {
if t == nil || len(t.Continuation) == 0 || len(children) == 0 {
return
}
rest := t.Continuation[1:]
t.Continuation = append(append(make([]changeFeedRange, 0, len(children)+len(rest)), children...), rest...)
}
75 changes: 33 additions & 42 deletions sdk/data/azcosmos/cosmos_change_feed_request_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package azcosmos

import (
"encoding/json"
"fmt"
"strconv"
"time"
)
Expand Down Expand Up @@ -39,55 +39,46 @@ type ChangeFeedOptions struct {
ThroughputBucket *int32
}

func (options *ChangeFeedOptions) toHeaders(partitionKeyRanges []partitionKeyRange) *map[string]string {
headers := make(map[string]string)

// buildRequestHeaders constructs the exact headers needed for a single
// change-feed request against one queue head. Pure builder: callers MUST
// supply the head and the already-resolved PK-range ID (overlap-matched
// via the routing map, NOT exact-matched).
//
// This is the path used by the new queue-driven GetChangeFeed loop. The
// caller-options-level Continuation token is NOT consulted here — the
// queue-head ETag drives If-None-Match because the queue may have been
// split-expanded since the token was issued.
//
// Returns an error when PartitionKey serialization fails — sending a
// change-feed read with a missing PK header would yield an opaque
// server-side error, so we surface the cause to the caller instead.
func (options *ChangeFeedOptions) buildRequestHeaders(head changeFeedRange, resolvedPKRangeID string) (map[string]string, error) {
headers := make(map[string]string, 6)
headers[cosmosHeaderChangeFeed] = cosmosHeaderValuesChangeFeed

if options.MaxItemCount > 0 {
headers[cosmosHeaderMaxItemCount] = strconv.FormatInt(int64(options.MaxItemCount), 10)
}

if options.StartFrom != nil {
formatted := options.StartFrom.UTC().Format(time.RFC1123)
headers[cosmosHeaderIfModifiedSince] = formatted
}

if options.Continuation != nil && *options.Continuation != "" {
var compositeToken compositeContinuationToken
if err := json.Unmarshal([]byte(*options.Continuation), &compositeToken); err == nil && len(compositeToken.Continuation) > 0 {
if compositeToken.Continuation[0].ContinuationToken != nil {
headers[headerIfNoneMatch] = string(*compositeToken.Continuation[0].ContinuationToken)
}
if options.FeedRange == nil {
options.FeedRange = &FeedRange{
MinInclusive: compositeToken.Continuation[0].MinInclusive,
MaxExclusive: compositeToken.Continuation[0].MaxExclusive,
}
}
} else {
headers[headerIfNoneMatch] = *options.Continuation
if options != nil {
if options.MaxItemCount > 0 {
headers[cosmosHeaderMaxItemCount] = strconv.FormatInt(int64(options.MaxItemCount), 10)
}
}

if options.PartitionKey != nil {
partitionKeyJSON, err := options.PartitionKey.toJsonString()
if err == nil {
headers[cosmosHeaderPartitionKey] = string(partitionKeyJSON)
if options.StartFrom != nil {
headers[cosmosHeaderIfModifiedSince] = options.StartFrom.UTC().Format(time.RFC1123)
}
if options.PartitionKey != nil {
pkJSON, err := options.PartitionKey.toJsonString()
if err != nil {
return nil, fmt.Errorf("ChangeFeedOptions: serializing PartitionKey: %w", err)
}
headers[cosmosHeaderPartitionKey] = string(pkJSON)
}
}

if options.FeedRange != nil && len(partitionKeyRanges) > 0 {
if id, err := findPartitionKeyRangeID(*options.FeedRange, partitionKeyRanges); err == nil {
headers[headerXmsDocumentDbPartitionKeyRangeId] = id
} else {
return nil
}
if head.ContinuationToken != nil && *head.ContinuationToken != "" {
headers[headerIfNoneMatch] = string(*head.ContinuationToken)
}

if len(headers) == 0 {
return nil
if resolvedPKRangeID != "" {
headers[headerXmsDocumentDbPartitionKeyRangeId] = resolvedPKRangeID
}

return &headers
return headers, nil
}
Loading