Skip to content

Stand By Feed and ChangeFeed pull model#105

Merged
kirankumarkolli merged 43 commits intomasterfrom
users/ealsur/cfpull
Apr 9, 2019
Merged

Stand By Feed and ChangeFeed pull model#105
kirankumarkolli merged 43 commits intomasterfrom
users/ealsur/cfpull

Conversation

@ealsur
Copy link
Copy Markdown
Member

@ealsur ealsur commented Apr 3, 2019

Poll model iterator for Change Feed

This PR introduces a GetStandByFeedIterator to CosmosItems that will sequentially read the Change Feed for a particular Container.

The sequential read will be done at the PartitionKeyRange level, reading each range until a 304 is obtained and continuing with the next range across FetchNextAsync calls.

In each iteration, a ContinuationToken will be returned that is a composite of a list of continuation tokens for all existing ranges.

The HasMoreResults variable is expected to be always true and is on the caller's responsibility to break out of the loop based on a particular logic (ie. when all ranges have been visited or when # amount of ranges have been visited).

Internally, the Iterator will handle the following error scenarios:

  1. If the current range being read splits, a refresh of the children will be done and added to the continuation token for continuous read. Current range will be updated for a next call to FetchNextAsync.
  2. If the current request exceeds Cosmos DB max response size (4Mb) due to the MaxItemSize parameter, the caller will be signaled to retry with HasMoreResults true in the response and internally, updating MaxItemSize to half.
  3. If the current range does not exist anymore, it will be discarded from the token, and the next one will be available to read through a FetchNextAsync call (HasMoreResults will be true).

Additionally, when the user provides it's own ContinuationToken (obtained from a previous usage), the following validations will be done:

  1. If the first range to read is invalid, it will be discarded, and the next valid one will be updated in the response's Continuation, ready for a FetchNextAsync.
  2. If all ranges in the provided token are invalid/do not exist, a new token will be built based on the current PK Range distribution in the container will be generated and used.

Sample usage

CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items;
CosmosFeedResultSetIterator setIterator =
    itemsCore.GetStandByFeedIterator(new CosmosChangeFeedRequestOptions() {
       StartTime = DateTime.MinValue
});
while (setIterator.HasMoreResults)
{
    using (CosmosResponseMessage iterator =
        await setIterator.FetchNextSetAsync(this.cancellationToken))
    {
        Trace.WriteLine(iterator.Header.Continuation); // Global Continuation Token
        if (iterator.Content != null)
        {
            Trace.WriteLine(response.Count);
        }
    }
}

Comment thread Microsoft.Azure.Cosmos/src/Routing/PartitionRoutingHelper.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs Outdated
Copy link
Copy Markdown
Member

@kirankumarkolli kirankumarkolli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check comments

Comment thread Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs
Comment thread Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs
Comment thread Microsoft.Azure.Cosmos/src/RequestOptions/CosmosChangeFeedRequestOptions.cs Outdated
Comment thread Microsoft.Azure.Cosmos/src/RequestOptions/CosmosChangeFeedRequestOptions.cs Outdated
@kirankumarkolli
Copy link
Copy Markdown
Member

Sample usage refresh:
CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items;
CosmosFeedResultSetIterator setIterator =
itemsCore.GetStandByFeedIterator(new CosmosChangeFeedRequestOptions() {StartFromBeginning = true});

Copy link
Copy Markdown
Contributor

@j82w j82w left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future it might be nice to include a rough design overview mark down file.

// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since change feed is no longer part of the query pipeline maybe we should move it to a new namespace.

@kirankumarkolli kirankumarkolli merged commit 319c4ba into master Apr 9, 2019
@kirankumarkolli kirankumarkolli deleted the users/ealsur/cfpull branch April 9, 2019 20:39
@kirankumarkolli
Copy link
Copy Markdown
Member

Thanks alot @ealsur.

kirankumarkolli added a commit that referenced this pull request Apr 9, 2019
* Assemblyinfo clean-up (#104)

* Signed build assembly info fix

* Assembly info clean-up

* Including test projects internal visible to (#107)

* Stand By Feed and ChangeFeed pull model (#105)

* Adding initial files

* Using Etag for continuation

* Removing unused

* Refactoring to reduce variables

* Refactoring to use CompositeToken

* Adding feed test

* Refactor through Options

* Adding public methods and comments

* Routing through the point transport handler

* Moving to outer if

* Adding split logic

* Adding unit tests

* Adding logic to detect invalid continuation tokens

* Adding JSON validation

* Routing based on PKRangeId

* Renaming and adding more tests

* Moving logic into the token

* Forcing refresh on split

* Addressing final coments

* Addressing feedback

* Added test to cover CT passing

* Refactoring and adding pkrangedelegate

* Argument checks

* Moving contract to CosmosRequestMessage

* Refactoring make EnsureInitialized async

* Moving tests to a new file

* Adding PKrange assert

* Refactored back to parameters outside Options

* UT split

* Adding Start* checks

* Adding new tests and renames

* Addressing comments

* Refactoring for cache tests

* Adding comments to tests

* Adding factory method

* Addressing comments

* Refactoring PKRange outside options

* Addressing comments

* Removing StartFromBeginning

* Removing extra lines

* Removing unnecessary ToList

* Raising version to 22-preview (#113)
kirankumarkolli added a commit that referenced this pull request Apr 30, 2019
* Raising version to 22-preview

* Merge master into release (CF Pull internal API) (#115)

* Assemblyinfo clean-up (#104)

* Signed build assembly info fix

* Assembly info clean-up

* Including test projects internal visible to (#107)

* Stand By Feed and ChangeFeed pull model (#105)

* Adding initial files

* Using Etag for continuation

* Removing unused

* Refactoring to reduce variables

* Refactoring to use CompositeToken

* Adding feed test

* Refactor through Options

* Adding public methods and comments

* Routing through the point transport handler

* Moving to outer if

* Adding split logic

* Adding unit tests

* Adding logic to detect invalid continuation tokens

* Adding JSON validation

* Routing based on PKRangeId

* Renaming and adding more tests

* Moving logic into the token

* Forcing refresh on split

* Addressing final coments

* Addressing feedback

* Added test to cover CT passing

* Refactoring and adding pkrangedelegate

* Argument checks

* Moving contract to CosmosRequestMessage

* Refactoring make EnsureInitialized async

* Moving tests to a new file

* Adding PKrange assert

* Refactored back to parameters outside Options

* UT split

* Adding Start* checks

* Adding new tests and renames

* Addressing comments

* Refactoring for cache tests

* Adding comments to tests

* Adding factory method

* Addressing comments

* Refactoring PKRange outside options

* Addressing comments

* Removing StartFromBeginning

* Removing extra lines

* Removing unnecessary ToList

* Raising version to 22-preview (#113)

* fixing build errors

* fixing incorrect merges

* remove unnecessary usings
kirankumarkolli added a commit that referenced this pull request May 6, 2019
* Raising version to 22-preview

* Merge master into release (CF Pull internal API) (#115)

* Assemblyinfo clean-up (#104)

* Signed build assembly info fix

* Assembly info clean-up

* Including test projects internal visible to (#107)

* Stand By Feed and ChangeFeed pull model (#105)

* Adding initial files

* Using Etag for continuation

* Removing unused

* Refactoring to reduce variables

* Refactoring to use CompositeToken

* Adding feed test

* Refactor through Options

* Adding public methods and comments

* Routing through the point transport handler

* Moving to outer if

* Adding split logic

* Adding unit tests

* Adding logic to detect invalid continuation tokens

* Adding JSON validation

* Routing based on PKRangeId

* Renaming and adding more tests

* Moving logic into the token

* Forcing refresh on split

* Addressing final coments

* Addressing feedback

* Added test to cover CT passing

* Refactoring and adding pkrangedelegate

* Argument checks

* Moving contract to CosmosRequestMessage

* Refactoring make EnsureInitialized async

* Moving tests to a new file

* Adding PKrange assert

* Refactored back to parameters outside Options

* UT split

* Adding Start* checks

* Adding new tests and renames

* Addressing comments

* Refactoring for cache tests

* Adding comments to tests

* Adding factory method

* Addressing comments

* Refactoring PKRange outside options

* Addressing comments

* Removing StartFromBeginning

* Removing extra lines

* Removing unnecessary ToList

* Raising version to 22-preview (#113)

* fix bugs

* Fixing signing issue
kirankumarkolli added a commit that referenced this pull request May 12, 2019
* Raising version to 22-preview

* Merge master into release (CF Pull internal API) (#115)

* Assemblyinfo clean-up (#104)

* Signed build assembly info fix

* Assembly info clean-up

* Including test projects internal visible to (#107)

* Stand By Feed and ChangeFeed pull model (#105)

* Adding initial files

* Using Etag for continuation

* Removing unused

* Refactoring to reduce variables

* Refactoring to use CompositeToken

* Adding feed test

* Refactor through Options

* Adding public methods and comments

* Routing through the point transport handler

* Moving to outer if

* Adding split logic

* Adding unit tests

* Adding logic to detect invalid continuation tokens

* Adding JSON validation

* Routing based on PKRangeId

* Renaming and adding more tests

* Moving logic into the token

* Forcing refresh on split

* Addressing final coments

* Addressing feedback

* Added test to cover CT passing

* Refactoring and adding pkrangedelegate

* Argument checks

* Moving contract to CosmosRequestMessage

* Refactoring make EnsureInitialized async

* Moving tests to a new file

* Adding PKrange assert

* Refactored back to parameters outside Options

* UT split

* Adding Start* checks

* Adding new tests and renames

* Addressing comments

* Refactoring for cache tests

* Adding comments to tests

* Adding factory method

* Addressing comments

* Refactoring PKRange outside options

* Addressing comments

* Removing StartFromBeginning

* Removing extra lines

* Removing unnecessary ToList

* Raising version to 22-preview (#113)

* fix bugs

* Fixing signing issue

* Hot fixes for CosmosSerializationOptions and query partition key issues (#207)

* Hot fixes for CosmosSerializationOptions and query partition key.

* initial commit (#193)

* Fixed formatting

* added new system strings (#210)

* Ignoring TestLazyIndexAllTerms (#211)

* PartitionedCRUDTest ignored (#217)

* Exception-less Streaming API for non-existing NP container scenarios (#227)

* First cut of fix

* Refreshing syntax as per C#7

* ComosContextCore depenency on ExecUtils removed.
Few proxy methods from ExecUtils are removed.

* SDK version to 3.0.0.11-preview and Direct dependency to 3.0.0.27-preview

* Cherry pick 15a9e26 into 3.0.0.11_preview (#235)

* Query routing with partition key definition (#213)

* Fix breaks for the release

* Remove unused usings

* Adding a unit test to verify setting PartitionKeyDefinition works. (#226)

* More

* Revert "Fixing samples build break (#236)"

This reverts commit cda8903.

* Force samples to Nuget.org only

* Fixes for query pipeline

* Some more fixes

* Np tests and quarantine open-partition tests

* small correction

* Direct contract test pruning

* Few test fixes

* Bug fix for paramaterised queries

* Including NP query tests

* One more test fix

* Fix for MixedModeTypoes unit test

* MixedMode multi-targeting tests clean.

* Enabling Distinct and TopOrderBy for NP.

* Quarantine TestQueryCrossPartitionAggregateFunctionsWithMixedTypes

* Test fixes.

* BadRequestException fixes

* Owner attribute removed
kirankumarkolli added a commit that referenced this pull request Apr 23, 2026
…titioned lease containers (#5807)

**Research report (gist):**
https://gist.github.com/kirankumarkolli/8bf1cbb9c3ee45dfa64511c4a0cb35ad

## Problem statement

In V3 SDK 3.20.0+, when the lease container is partitioned on
`/partitionKey` (used for Gremlin/Graph API accounts and any
customer-chosen layout),
`DocumentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync` sets the
new lease's partition-key **value** to `Guid.NewGuid().ToString()`.
Because Cosmos DB's `id` uniqueness is scoped per logical partition, two
racing hosts that compute the same deterministic lease `id` land in
different logical partitions and both succeed with `201 Created` instead
of the `409 Conflict` the dedup logic relies on.

**Root cause:** V3 PR #2491 (2021-05-27) and its V2 predecessor PR #158
silently violated the original V2 PR #105 (2018) invariant that *"lease
collection PK must be a function of monitored collection PK Range. We
already have that function as lease id."*

## Fix (minimal, env-gated with safe default)

### New env var

| Variable | Default | Purpose |
|---|---|---|
| `AZURE_COSMOS_CHANGE_FEED_LEASE_ID_AS_PARTITION_KEY_ENABLED` | `true`
| When `true`, lease docs in a `/partitionKey`-partitioned lease
container use the deterministic lease `id` as the partition-key value
(restores the 409-based dedup invariant). When `false`, falls back to
legacy `Guid.NewGuid()` behavior. |

### Code changes

**`ConfigurationManager.cs`** — New env-var constant and accessor
`IsChangeFeedLeaseIdAsPartitionKeyEnabled()`, mirroring the established
`AZURE_COSMOS_*_ENABLED` pattern.

**`DocumentServiceLeaseManagerCosmos.cs`** — New
`useDeterministicPartitionKey` field snapshotted in the constructor
(ensures consistency for the manager's lifetime). New
`GetLeasePartitionKeyValue(leaseId)` helper used at both
`CreateLeaseIfNotExistAsync` call sites, replacing the
`Guid.NewGuid().ToString()` calls.

### Why this is the right fix

| Property | Status after fix |
|---|---|
| PK path | Unchanged (`/partitionKey`) — Gremlin still works |
| Public API surface | Unchanged |
| Default behavior | **New deterministic PK** (fix is on by default) |
| Operational escape hatch |
`AZURE_COSMOS_CHANGE_FEED_LEASE_ID_AS_PARTITION_KEY_ENABLED=false`
reverts per-process |
| Dedup invariant (PR #105) | **Restored** for all three lease-container
layouts |
| Back-compat read path | Unchanged — `TryGetLeaseAsync` uses stored PK
value |

### Not required (explicitly)

- No schema migration — old GUID-PK leases coexist with new `id`-PK
leases; both read/update paths route through the stored `PartitionKey`
field.
- No change to
`PartitionedByPartitionKeyCollectionRequestOptionsFactory` (already
routes by stored PK value).

## Risks and mitigations

| # | Risk | Likelihood | Mitigation |
|---|---|---|---|
| 1 | Existing customers carry a mix of GUID-PK (old) and `id`-PK (new)
leases | Certain | Safe by design — `TryGetLeaseAsync` reads stored
`lease.PartitionKey`; old leases remain readable/updatable |
| 2 | Duplicate leases already written under old code persist | Certain
| Out of scope — not self-clearing; manual cleanup needed |
| 3 | Mixed-fleet window during rolling upgrade | Certain during rollout
| Dedup fully restored only after all hosts upgraded. Not a new failure
mode. |
| 4 | Hot-partition risk | Very low | Each lease has a different PK
value (per-leaseToken), so distribution is similar to `/id` layout |

### Upgrade semantics — mixed-fleet behavior

During rolling upgrade (old + new SDK hosts coexist):
- Read/update/delete/checkpoint of **existing** leases: works on both
sides (routes through stored `lease.PartitionKey`).
- Concurrent **create** of the same missing lease: still produces
duplicates if one host is old SDK (GUID PK) and another is new (id PK).
**Dedup is fully restored only after rollout completes.**

### Known limitation — pre-existing duplicates not auto-cleaned

Duplicate leases written by older SDK versions persist until the
corresponding PKRange undergoes a split/merge. Manual cleanup (`SELECT *
FROM c WHERE c.id = @id` + `DELETE` extras) is needed for existing
duplicates.

## Test coverage

### Unit tests — `DocumentServiceLeaseManagerCosmosTests.cs`

| Test | What it proves |
|---|---|
| `CreateLeaseIfNotExistAsync_PartitionKeyBehavior` (4 DataRows) |
Parameterized across PKRange/EPK × default/opt-out: captures the
`PartitionKey` sent in the create request and asserts deterministic (PK
= lease.Id) or legacy (PK = GUID ≠ lease.Id) behavior |
| `CreateLeaseIfNotExistAsync_DuplicatePkId_Returns409` (2 DataRows) |
Simulates Cosmos DB's (PK, id) uniqueness constraint via
`ConcurrentDictionary` — first create succeeds, second with same (PK,
id) gets 409 |
| `AcquireCompletes_WithPreExistingGuidPartitionKey` | Back-compat:
lease with old random-GUID PK can still be acquired (stored PK
preserved) |
| `RenewCompletes_WithPreExistingGuidPartitionKey` | Back-compat: lease
with old random-GUID PK can still be renewed (stored PK preserved) |
| Existing `CreatesEPKBasedLease` / `CreatesPartitionKeyBasedLease` |
**Strengthened:** added `Assert.AreEqual(lease.Id, lease.PartitionKey)`
for `/partitionKey` factory |

Infrastructure: `[DoNotParallelize]` +
`[TestInitialize]`/`[TestCleanup]` for env var safety. Shared helpers
eliminate duplication across overload types.

### Emulator integration test — `GremlinSmokeTests.cs`

| Test | What changed |
|---|---|
| `Schema_DefaultsToHavingPartitionKey` | Converted to
`[DataTestMethod]` with two rows: **(a)** default env → asserts
`partitionKey == lease.id` (dedup invariant), **(b)** env var `false` →
asserts `partitionKey` is a GUID ≠ `lease.id` (legacy behavior) |

Infrastructure: `[DoNotParallelize]` added (env var mutation is
process-global).

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: kirankumarkolli <6880899+kirankumarkolli@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants