-
Notifications
You must be signed in to change notification settings - Fork 944
.NET: Add Cosmos DB implementations for ChatMessageStore and CheckpointStore. #1838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
westey-m
merged 44 commits into
microsoft:main
from
TheovanKraay:csharp-cosmosdb-store-implementations
Nov 26, 2025
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
b778a52
draft commit
TheovanKraay 2208d39
Added Cosmos agent thread and tests
TheovanKraay 67c65aa
revert unnecessary changes and fix tests
TheovanKraay d020e18
add multi-tenant support with hierarchical partition keys (and tests).
TheovanKraay 9de2fe8
enhance transactional batch
TheovanKraay b80c59e
address review comments
TheovanKraay 042db62
Address PR review comments from @westey-m
TheovanKraay 405d0ca
Merge upstream/main - resolve slnx conflicts
TheovanKraay 7201533
Merge upstream/main into csharp-cosmosdb-store-implementations
TheovanKraay 7672a76
use param validation helpers
TheovanKraay 394b749
Replace useManagedIdentity boolean with TokenCredential parameter
TheovanKraay 76f300d
Remove redundant suppressions and fix tests
TheovanKraay bbbc651
Rename project from Microsoft.Agents.AI.Abstractions.CosmosNoSql to M…
TheovanKraay 2015a76
Refactor constructors to use chaining pattern
TheovanKraay bea1b47
Reorder deserialization constructor parameters for consistency
TheovanKraay 721a0b0
Remove database/container IDs from serialized state
TheovanKraay c2aae5f
Remove auto-generation of MessageId
TheovanKraay c5ad371
Optimize AddMessagesAsync to avoid enumeration when possible
TheovanKraay ba719f1
Add MaxMessagesToRetrieve to limit context window
TheovanKraay ba431e5
Make Role nullable instead of defaulting
TheovanKraay aef9491
Merge branch 'main' into csharp-cosmosdb-store-implementations
TheovanKraay 03e7621
Fix net472 build without rebasing 19 commits
TheovanKraay 8d0fa99
Add Cosmos DB emulator to CI workflow
TheovanKraay 823b59a
Fix Cosmos DB emulator tests: use Skip.If instead of Assert.Fail and …
TheovanKraay ecc3498
Replace Skip.If() with conditional return to fix compilation
TheovanKraay 7a574c6
Use env var to skip Cosmos tests on non-Windows CI
TheovanKraay 8f0a3ba
Add Xunit.SkippableFact package to properly skip Cosmos tests on Linux
TheovanKraay 76ba62b
Change [Fact] to [SkippableFact] for proper test skipping behavior
TheovanKraay 4862ca4
Remove stale Microsoft.Agents.AI.Abstractions.CosmosNoSql directory
TheovanKraay b5675d6
Fix code formatting: add braces, this. qualifications, and final newl…
TheovanKraay e8c0e27
Fix file encoding to UTF-8 with BOM, fix import ordering, and remove …
TheovanKraay ec0a454
Convert backing fields to auto-properties and remove Azure.Identity u…
TheovanKraay 0b49065
Fix CosmosChatMessageStore.cs encoding back to UTF-8 with BOM
TheovanKraay e6f4b79
Fix test file formatting: indentation, encoding, imports, this. quali…
TheovanKraay 1d32377
Fix const field naming violations: Remove s_ prefix from const fields…
TheovanKraay b40a8ce
Add local .editorconfig for Cosmos DB tests to suppress IDE0005 false…
TheovanKraay 10aecaa
Fix IDE1006 naming violations: Rename TestDatabaseId to s_testDatabas…
TheovanKraay d3200f7
Address PR review comments
TheovanKraay 59b9cf8
Fix IDE0001 formatting error in AgentProviderExtensions.cs. Use type …
TheovanKraay 9c736b5
Merge upstream/main into csharp-cosmosdb-store-implementations
TheovanKraay 108d7ab
Update package versions for Aspire 13.0.0 compatibility
TheovanKraay 4c1a6b1
Merge upstream/main into csharp-cosmosdb-store-implementations
TheovanKraay d472ad4
Fix TargetFrameworks in Cosmos DB projects
TheovanKraay 5f06177
Remove redundant counter, add partition key validation, use factory p…
TheovanKraay File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
688 changes: 688 additions & 0 deletions
688
dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosChatMessageStore.cs
Large diffs are not rendered by default.
Oops, something went wrong.
279 changes: 279 additions & 0 deletions
279
dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosCheckpointStore.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,279 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics.CodeAnalysis; | ||
| using System.Linq; | ||
| using System.Text.Json; | ||
| using System.Threading.Tasks; | ||
| using Azure.Core; | ||
| using Microsoft.Azure.Cosmos; | ||
| using Microsoft.Shared.Diagnostics; | ||
| using Newtonsoft.Json; | ||
| using Newtonsoft.Json.Linq; | ||
|
|
||
| namespace Microsoft.Agents.AI.Workflows.Checkpointing; | ||
|
|
||
| /// <summary> | ||
| /// Provides a Cosmos DB implementation of the <see cref="JsonCheckpointStore"/> abstract class. | ||
| /// </summary> | ||
| /// <typeparam name="T">The type of objects to store as checkpoint values.</typeparam> | ||
| [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] | ||
| [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] | ||
| public class CosmosCheckpointStore<T> : JsonCheckpointStore, IDisposable | ||
| { | ||
| private readonly CosmosClient _cosmosClient; | ||
| private readonly Container _container; | ||
| private readonly bool _ownsClient; | ||
| private bool _disposed; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="CosmosCheckpointStore{T}"/> class using a connection string. | ||
| /// </summary> | ||
| /// <param name="connectionString">The Cosmos DB connection string.</param> | ||
| /// <param name="databaseId">The identifier of the Cosmos DB database.</param> | ||
| /// <param name="containerId">The identifier of the Cosmos DB container.</param> | ||
| /// <exception cref="ArgumentNullException">Thrown when any required parameter is null.</exception> | ||
| /// <exception cref="ArgumentException">Thrown when any string parameter is null or whitespace.</exception> | ||
| public CosmosCheckpointStore(string connectionString, string databaseId, string containerId) | ||
| { | ||
| var cosmosClientOptions = new CosmosClientOptions(); | ||
|
|
||
| this._cosmosClient = new CosmosClient(Throw.IfNullOrWhitespace(connectionString), cosmosClientOptions); | ||
| this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); | ||
| this._ownsClient = true; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="CosmosCheckpointStore{T}"/> class using a TokenCredential for authentication. | ||
| /// </summary> | ||
| /// <param name="accountEndpoint">The Cosmos DB account endpoint URI.</param> | ||
| /// <param name="tokenCredential">The TokenCredential to use for authentication (e.g., DefaultAzureCredential, ManagedIdentityCredential).</param> | ||
| /// <param name="databaseId">The identifier of the Cosmos DB database.</param> | ||
| /// <param name="containerId">The identifier of the Cosmos DB container.</param> | ||
| /// <exception cref="ArgumentNullException">Thrown when any required parameter is null.</exception> | ||
| /// <exception cref="ArgumentException">Thrown when any string parameter is null or whitespace.</exception> | ||
| public CosmosCheckpointStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId) | ||
| { | ||
| var cosmosClientOptions = new CosmosClientOptions | ||
| { | ||
| SerializerOptions = new CosmosSerializationOptions | ||
| { | ||
| PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase | ||
| } | ||
| }; | ||
|
|
||
| this._cosmosClient = new CosmosClient(Throw.IfNullOrWhitespace(accountEndpoint), Throw.IfNull(tokenCredential), cosmosClientOptions); | ||
| this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); | ||
| this._ownsClient = true; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="CosmosCheckpointStore{T}"/> class using an existing <see cref="CosmosClient"/>. | ||
| /// </summary> | ||
| /// <param name="cosmosClient">The <see cref="CosmosClient"/> instance to use for Cosmos DB operations.</param> | ||
| /// <param name="databaseId">The identifier of the Cosmos DB database.</param> | ||
| /// <param name="containerId">The identifier of the Cosmos DB container.</param> | ||
| /// <exception cref="ArgumentNullException">Thrown when <paramref name="cosmosClient"/> is null.</exception> | ||
| /// <exception cref="ArgumentException">Thrown when any string parameter is null or whitespace.</exception> | ||
| public CosmosCheckpointStore(CosmosClient cosmosClient, string databaseId, string containerId) | ||
| { | ||
| this._cosmosClient = Throw.IfNull(cosmosClient); | ||
|
|
||
| this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); | ||
| this._ownsClient = false; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the identifier of the Cosmos DB database. | ||
| /// </summary> | ||
| public string DatabaseId => this._container.Database.Id; | ||
|
|
||
| /// <summary> | ||
| /// Gets the identifier of the Cosmos DB container. | ||
| /// </summary> | ||
| public string ContainerId => this._container.Id; | ||
|
|
||
| /// <inheritdoc /> | ||
| public override async ValueTask<CheckpointInfo> CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(runId)) | ||
| { | ||
| throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); | ||
| } | ||
|
|
||
| #pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks | ||
| if (this._disposed) | ||
| { | ||
| throw new ObjectDisposedException(this.GetType().FullName); | ||
| } | ||
| #pragma warning restore CA1513 | ||
|
|
||
| var checkpointId = Guid.NewGuid().ToString("N"); | ||
| var checkpointInfo = new CheckpointInfo(runId, checkpointId); | ||
|
|
||
| var document = new CosmosCheckpointDocument | ||
| { | ||
| Id = $"{runId}_{checkpointId}", | ||
| RunId = runId, | ||
| CheckpointId = checkpointId, | ||
| Value = JToken.Parse(value.GetRawText()), | ||
| ParentCheckpointId = parent?.CheckpointId, | ||
| Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() | ||
| }; | ||
|
|
||
| await this._container.CreateItemAsync(document, new PartitionKey(runId)).ConfigureAwait(false); | ||
| return checkpointInfo; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public override async ValueTask<JsonElement> RetrieveCheckpointAsync(string runId, CheckpointInfo key) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(runId)) | ||
| { | ||
| throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); | ||
| } | ||
|
|
||
| if (key is null) | ||
| { | ||
| throw new ArgumentNullException(nameof(key)); | ||
| } | ||
|
|
||
| #pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks | ||
| if (this._disposed) | ||
| { | ||
| throw new ObjectDisposedException(this.GetType().FullName); | ||
| } | ||
| #pragma warning restore CA1513 | ||
|
|
||
| var id = $"{runId}_{key.CheckpointId}"; | ||
|
|
||
| try | ||
| { | ||
| var response = await this._container.ReadItemAsync<CosmosCheckpointDocument>(id, new PartitionKey(runId)).ConfigureAwait(false); | ||
| using var document = JsonDocument.Parse(response.Resource.Value.ToString()); | ||
| return document.RootElement.Clone(); | ||
| } | ||
| catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound) | ||
| { | ||
| throw new InvalidOperationException($"Checkpoint with ID '{key.CheckpointId}' for run '{runId}' not found."); | ||
| } | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public override async ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(runId)) | ||
| { | ||
| throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); | ||
| } | ||
|
|
||
| #pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks | ||
| if (this._disposed) | ||
| { | ||
| throw new ObjectDisposedException(this.GetType().FullName); | ||
| } | ||
| #pragma warning restore CA1513 | ||
|
|
||
| QueryDefinition query = withParent == null | ||
| ? new QueryDefinition("SELECT c.runId, c.checkpointId FROM c WHERE c.runId = @runId ORDER BY c.timestamp ASC") | ||
| .WithParameter("@runId", runId) | ||
| : new QueryDefinition("SELECT c.runId, c.checkpointId FROM c WHERE c.runId = @runId AND c.parentCheckpointId = @parentCheckpointId ORDER BY c.timestamp ASC") | ||
| .WithParameter("@runId", runId) | ||
| .WithParameter("@parentCheckpointId", withParent.CheckpointId); | ||
|
|
||
| var iterator = this._container.GetItemQueryIterator<CheckpointQueryResult>(query); | ||
| var checkpoints = new List<CheckpointInfo>(); | ||
|
|
||
| while (iterator.HasMoreResults) | ||
| { | ||
| var response = await iterator.ReadNextAsync().ConfigureAwait(false); | ||
| checkpoints.AddRange(response.Select(r => new CheckpointInfo(r.RunId, r.CheckpointId))); | ||
| } | ||
|
|
||
| return checkpoints; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public void Dispose() | ||
| { | ||
| this.Dispose(true); | ||
| GC.SuppressFinalize(this); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Releases the unmanaged resources used by the <see cref="CosmosCheckpointStore{T}"/> and optionally releases the managed resources. | ||
| /// </summary> | ||
| /// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param> | ||
| protected virtual void Dispose(bool disposing) | ||
| { | ||
| if (!this._disposed) | ||
| { | ||
| if (disposing && this._ownsClient) | ||
| { | ||
| this._cosmosClient?.Dispose(); | ||
| } | ||
| this._disposed = true; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Represents a checkpoint document stored in Cosmos DB. | ||
| /// </summary> | ||
| internal sealed class CosmosCheckpointDocument | ||
| { | ||
| [JsonProperty("id")] | ||
| public string Id { get; set; } = string.Empty; | ||
|
|
||
| [JsonProperty("runId")] | ||
| public string RunId { get; set; } = string.Empty; | ||
|
|
||
| [JsonProperty("checkpointId")] | ||
| public string CheckpointId { get; set; } = string.Empty; | ||
|
|
||
| [JsonProperty("value")] | ||
| public JToken Value { get; set; } = JValue.CreateNull(); | ||
|
|
||
| [JsonProperty("parentCheckpointId")] | ||
| public string? ParentCheckpointId { get; set; } | ||
|
|
||
| [JsonProperty("timestamp")] | ||
| public long Timestamp { get; set; } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Represents the result of a checkpoint query. | ||
| /// </summary> | ||
| [SuppressMessage("Performance", "CA1812:Avoid uninstantiated internal classes", Justification = "Instantiated by Cosmos DB query deserialization")] | ||
| private sealed class CheckpointQueryResult | ||
| { | ||
| public string RunId { get; set; } = string.Empty; | ||
| public string CheckpointId { get; set; } = string.Empty; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Provides a non-generic Cosmos DB implementation of the <see cref="JsonCheckpointStore"/> abstract class. | ||
| /// </summary> | ||
| [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] | ||
| [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] | ||
| public sealed class CosmosCheckpointStore : CosmosCheckpointStore<JsonElement> | ||
| { | ||
| /// <inheritdoc /> | ||
| public CosmosCheckpointStore(string connectionString, string databaseId, string containerId) | ||
| : base(connectionString, databaseId, containerId) | ||
| { | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public CosmosCheckpointStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId) | ||
| : base(accountEndpoint, tokenCredential, databaseId, containerId) | ||
| { | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public CosmosCheckpointStore(CosmosClient cosmosClient, string databaseId, string containerId) | ||
| : base(cosmosClient, databaseId, containerId) | ||
| { | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.