diff --git a/.github/workflows/dotnet-build-and-test.yml b/.github/workflows/dotnet-build-and-test.yml index 97d1d60b3a..6b2411d726 100644 --- a/.github/workflows/dotnet-build-and-test.yml +++ b/.github/workflows/dotnet-build-and-test.yml @@ -124,6 +124,16 @@ jobs: popd rm -rf "$TEMP_DIR" + # Start Cosmos DB Emulator for Cosmos-based unit tests (only on Windows) + - name: Start Azure Cosmos DB Emulator + if: runner.os == 'Windows' + shell: pwsh + run: | + Write-Host "Launching Azure Cosmos DB Emulator" + Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator" + Start-CosmosDbEmulator -NoUI -Key "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==" + echo "COSMOS_EMULATOR_AVAILABLE=true" >> $env:GITHUB_ENV + - name: Run Unit Tests shell: bash run: | @@ -143,6 +153,10 @@ jobs: echo "Skipping $project - does not support target framework ${{ matrix.targetFramework }} (supports: $target_frameworks)" fi done + env: + # Cosmos DB Emulator connection settings + COSMOSDB_ENDPOINT: https://localhost:8081 + COSMOSDB_KEY: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw== - name: Log event name and matrix integration-tests shell: bash @@ -181,6 +195,9 @@ jobs: fi done env: + # Cosmos DB Emulator connection settings + COSMOSDB_ENDPOINT: https://localhost:8081 + COSMOSDB_KEY: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw== # OpenAI Models OpenAI__ApiKey: ${{ secrets.OPENAI__APIKEY }} OpenAI__ChatModelId: ${{ vars.OPENAI__CHATMODELID }} diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index a49f0fc277..f9851d024b 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -23,6 +23,10 @@ + + + + @@ -127,6 +131,7 @@ + diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 9c5f1a81f3..be23a6869f 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -334,6 +334,7 @@ + @@ -372,6 +373,7 @@ + diff --git a/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosChatMessageStore.cs b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosChatMessageStore.cs new file mode 100644 index 0000000000..fff7f56fa5 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosChatMessageStore.cs @@ -0,0 +1,688 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.AI; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI; + +/// +/// Provides a Cosmos DB implementation of the abstract class. +/// +[RequiresUnreferencedCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with trimming.")] +[RequiresDynamicCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with NativeAOT.")] +public sealed class CosmosChatMessageStore : ChatMessageStore, IDisposable +{ + private readonly CosmosClient _cosmosClient; + private readonly Container _container; + private readonly bool _ownsClient; + private bool _disposed; + + // Hierarchical partition key support + private readonly string? _tenantId; + private readonly string? _userId; + private readonly PartitionKey _partitionKey; + private readonly bool _useHierarchicalPartitioning; + + /// + /// Cached JSON serializer options for .NET 9.0 compatibility. + /// + private static readonly JsonSerializerOptions s_defaultJsonOptions = CreateDefaultJsonOptions(); + + private static JsonSerializerOptions CreateDefaultJsonOptions() + { + var options = new JsonSerializerOptions(); +#if NET9_0_OR_GREATER + // Configure TypeInfoResolver for .NET 9.0 to enable JSON serialization + options.TypeInfoResolver = new System.Text.Json.Serialization.Metadata.DefaultJsonTypeInfoResolver(); +#endif + return options; + } + + /// + /// Gets or sets the maximum number of messages to return in a single query batch. + /// Default is 100 for optimal performance. + /// + public int MaxItemCount { get; set; } = 100; + + /// + /// Gets or sets the maximum number of items per transactional batch operation. + /// Default is 100, maximum allowed by Cosmos DB is 100. + /// + public int MaxBatchSize { get; set; } = 100; + + /// + /// Gets or sets the maximum number of messages to retrieve from the store. + /// This helps prevent exceeding LLM context windows in long conversations. + /// Default is null (no limit). When set, only the most recent messages are returned. + /// + public int? MaxMessagesToRetrieve { get; set; } + + /// + /// Gets or sets the Time-To-Live (TTL) in seconds for messages. + /// Default is 86400 seconds (24 hours). Set to null to disable TTL. + /// + public int? MessageTtlSeconds { get; set; } = 86400; + + /// + /// Gets the conversation ID associated with this message store. + /// + public string ConversationId { get; init; } + + /// + /// Gets the database ID associated with this message store. + /// + public string DatabaseId { get; init; } + + /// + /// Gets the container ID associated with this message store. + /// + public string ContainerId { get; init; } + + /// + /// Internal primary constructor used by all public constructors. + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The unique identifier for this conversation thread. + /// Whether this instance owns the CosmosClient and should dispose it. + /// Optional tenant identifier for hierarchical partitioning. + /// Optional user identifier for hierarchical partitioning. + internal CosmosChatMessageStore(CosmosClient cosmosClient, string databaseId, string containerId, string conversationId, bool ownsClient, string? tenantId = null, string? userId = null) + { + this._cosmosClient = Throw.IfNull(cosmosClient); + this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); + this.ConversationId = Throw.IfNullOrWhitespace(conversationId); + this.DatabaseId = databaseId; + this.ContainerId = containerId; + this._ownsClient = ownsClient; + + // Initialize partitioning mode + this._tenantId = tenantId; + this._userId = userId; + this._useHierarchicalPartitioning = tenantId != null && userId != null; + + this._partitionKey = this._useHierarchicalPartitioning + ? new PartitionKeyBuilder() + .Add(tenantId!) + .Add(userId!) + .Add(conversationId) + .Build() + : new PartitionKey(conversationId); + } + + /// + /// Initializes a new instance of the class using a connection string. + /// + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string connectionString, string databaseId, string containerId) + : this(connectionString, databaseId, containerId, Guid.NewGuid().ToString("N")) + { + } + + /// + /// Initializes a new instance of the class using a connection string. + /// + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The unique identifier for this conversation thread. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string connectionString, string databaseId, string containerId, string conversationId) + : this(new CosmosClient(Throw.IfNullOrWhitespace(connectionString)), databaseId, containerId, conversationId, ownsClient: true) + { + } + + /// + /// Initializes a new instance of the class using TokenCredential for authentication. + /// + /// The Cosmos DB account endpoint URI. + /// The TokenCredential to use for authentication (e.g., DefaultAzureCredential, ManagedIdentityCredential). + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId) + : this(accountEndpoint, tokenCredential, databaseId, containerId, Guid.NewGuid().ToString("N")) + { + } + + /// + /// Initializes a new instance of the class using a TokenCredential for authentication. + /// + /// The Cosmos DB account endpoint URI. + /// The TokenCredential to use for authentication (e.g., DefaultAzureCredential, ManagedIdentityCredential). + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The unique identifier for this conversation thread. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId, string conversationId) + : this(new CosmosClient(Throw.IfNullOrWhitespace(accountEndpoint), Throw.IfNull(tokenCredential)), databaseId, containerId, conversationId, ownsClient: true) + { + } + + /// + /// Initializes a new instance of the class using an existing . + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(CosmosClient cosmosClient, string databaseId, string containerId) + : this(cosmosClient, databaseId, containerId, Guid.NewGuid().ToString("N")) + { + } + + /// + /// Initializes a new instance of the class using an existing . + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The unique identifier for this conversation thread. + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(CosmosClient cosmosClient, string databaseId, string containerId, string conversationId) + : this(cosmosClient, databaseId, containerId, conversationId, ownsClient: false) + { + } + + /// + /// Initializes a new instance of the class using a connection string with hierarchical partition keys. + /// + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The tenant identifier for hierarchical partitioning. + /// The user identifier for hierarchical partitioning. + /// The session identifier for hierarchical partitioning. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string connectionString, string databaseId, string containerId, string tenantId, string userId, string sessionId) + : this(new CosmosClient(Throw.IfNullOrWhitespace(connectionString)), databaseId, containerId, Throw.IfNullOrWhitespace(sessionId), ownsClient: true, Throw.IfNullOrWhitespace(tenantId), Throw.IfNullOrWhitespace(userId)) + { + } + + /// + /// Initializes a new instance of the class using a TokenCredential for authentication with hierarchical partition keys. + /// + /// The Cosmos DB account endpoint URI. + /// The TokenCredential to use for authentication (e.g., DefaultAzureCredential, ManagedIdentityCredential). + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The tenant identifier for hierarchical partitioning. + /// The user identifier for hierarchical partitioning. + /// The session identifier for hierarchical partitioning. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId, string tenantId, string userId, string sessionId) + : this(new CosmosClient(Throw.IfNullOrWhitespace(accountEndpoint), Throw.IfNull(tokenCredential)), databaseId, containerId, Throw.IfNullOrWhitespace(sessionId), ownsClient: true, Throw.IfNullOrWhitespace(tenantId), Throw.IfNullOrWhitespace(userId)) + { + } + + /// + /// Initializes a new instance of the class using an existing with hierarchical partition keys. + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The tenant identifier for hierarchical partitioning. + /// The user identifier for hierarchical partitioning. + /// The session identifier for hierarchical partitioning. + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosChatMessageStore(CosmosClient cosmosClient, string databaseId, string containerId, string tenantId, string userId, string sessionId) + : this(cosmosClient, databaseId, containerId, Throw.IfNullOrWhitespace(sessionId), ownsClient: false, Throw.IfNullOrWhitespace(tenantId), Throw.IfNullOrWhitespace(userId)) + { + } + + /// + /// Creates a new instance of the class from previously serialized state. + /// + /// The instance to use for Cosmos DB operations. + /// A representing the serialized state of the message store. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Optional settings for customizing the JSON deserialization process. + /// A new instance of initialized from the serialized state. + /// Thrown when is null. + /// Thrown when the serialized state cannot be deserialized. + public static CosmosChatMessageStore CreateFromSerializedState(CosmosClient cosmosClient, JsonElement serializedStoreState, string databaseId, string containerId, JsonSerializerOptions? jsonSerializerOptions = null) + { + Throw.IfNull(cosmosClient); + Throw.IfNullOrWhitespace(databaseId); + Throw.IfNullOrWhitespace(containerId); + + if (serializedStoreState.ValueKind is not JsonValueKind.Object) + { + throw new ArgumentException("Invalid serialized state", nameof(serializedStoreState)); + } + + var state = JsonSerializer.Deserialize(serializedStoreState, jsonSerializerOptions); + if (state?.ConversationIdentifier is not { } conversationId) + { + throw new ArgumentException("Invalid serialized state", nameof(serializedStoreState)); + } + + // Use the internal constructor with all parameters to ensure partition key logic is centralized + return state.UseHierarchicalPartitioning && state.TenantId != null && state.UserId != null + ? new CosmosChatMessageStore(cosmosClient, databaseId, containerId, conversationId, ownsClient: false, state.TenantId, state.UserId) + : new CosmosChatMessageStore(cosmosClient, databaseId, containerId, conversationId, ownsClient: false); + } + + /// + public override async Task> GetMessagesAsync(CancellationToken cancellationToken = default) + { +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + // Fetch most recent messages in descending order when limit is set, then reverse to ascending + var orderDirection = this.MaxMessagesToRetrieve.HasValue ? "DESC" : "ASC"; + var query = new QueryDefinition($"SELECT * FROM c WHERE c.conversationId = @conversationId AND c.type = @type ORDER BY c.timestamp {orderDirection}") + .WithParameter("@conversationId", this.ConversationId) + .WithParameter("@type", "ChatMessage"); + + var iterator = this._container.GetItemQueryIterator(query, requestOptions: new QueryRequestOptions + { + PartitionKey = this._partitionKey, + MaxItemCount = this.MaxItemCount // Configurable query performance + }); + + var messages = new List(); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(cancellationToken).ConfigureAwait(false); + + foreach (var document in response) + { + if (this.MaxMessagesToRetrieve.HasValue && messages.Count >= this.MaxMessagesToRetrieve.Value) + { + break; + } + + if (!string.IsNullOrEmpty(document.Message)) + { + var message = JsonSerializer.Deserialize(document.Message, s_defaultJsonOptions); + if (message != null) + { + messages.Add(message); + } + } + } + + if (this.MaxMessagesToRetrieve.HasValue && messages.Count >= this.MaxMessagesToRetrieve.Value) + { + break; + } + } + + // If we fetched in descending order (most recent first), reverse to ascending order + if (this.MaxMessagesToRetrieve.HasValue) + { + messages.Reverse(); + } + + return messages; + } + + /// + public override async Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) + { + if (messages is null) + { + throw new ArgumentNullException(nameof(messages)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var messageList = messages as IReadOnlyCollection ?? messages.ToList(); + if (messageList.Count == 0) + { + return; + } + + // Use transactional batch for atomic operations + if (messageList.Count > 1) + { + await this.AddMessagesInBatchAsync(messageList, cancellationToken).ConfigureAwait(false); + } + else + { + await this.AddSingleMessageAsync(messageList.First(), cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Adds multiple messages using transactional batch operations for atomicity. + /// + private async Task AddMessagesInBatchAsync(IReadOnlyCollection messages, CancellationToken cancellationToken) + { + var currentTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + // Process messages in optimal batch sizes + for (int i = 0; i < messages.Count; i += this.MaxBatchSize) + { + var batchMessages = messages.Skip(i).Take(this.MaxBatchSize).ToList(); + await this.ExecuteBatchOperationAsync(batchMessages, currentTimestamp, cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Executes a single batch operation with enhanced error handling. + /// Cosmos SDK handles throttling (429) retries automatically. + /// + private async Task ExecuteBatchOperationAsync(List messages, long timestamp, CancellationToken cancellationToken) + { + // Create all documents upfront for validation and batch operation + var documents = new List(messages.Count); + foreach (var message in messages) + { + documents.Add(this.CreateMessageDocument(message, timestamp)); + } + + // Defensive check: Verify all messages share the same partition key values + // In hierarchical partitioning, this means same tenantId, userId, and sessionId + // In simple partitioning, this means same conversationId + if (documents.Count > 0) + { + if (this._useHierarchicalPartitioning) + { + // Verify all documents have matching hierarchical partition key components + var firstDoc = documents[0]; + if (!documents.All(d => d.TenantId == firstDoc.TenantId && d.UserId == firstDoc.UserId && d.SessionId == firstDoc.SessionId)) + { + throw new InvalidOperationException("All messages in a batch must share the same partition key values (tenantId, userId, sessionId)."); + } + } + else + { + // Verify all documents have matching conversationId + var firstConversationId = documents[0].ConversationId; + if (!documents.All(d => d.ConversationId == firstConversationId)) + { + throw new InvalidOperationException("All messages in a batch must share the same partition key value (conversationId)."); + } + } + } + + // All messages in this store share the same partition key by design + // Transactional batches require all items to share the same partition key + var batch = this._container.CreateTransactionalBatch(this._partitionKey); + + foreach (var document in documents) + { + batch.CreateItem(document); + } + + try + { + var response = await batch.ExecuteAsync(cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + throw new InvalidOperationException($"Batch operation failed with status: {response.StatusCode}. Details: {response.ErrorMessage}"); + } + } + catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.RequestEntityTooLarge) + { + // If batch is too large, split into smaller batches + if (messages.Count == 1) + { + // Can't split further, use single operation + await this.AddSingleMessageAsync(messages[0], cancellationToken).ConfigureAwait(false); + return; + } + + // Split the batch in half and retry + var midpoint = messages.Count / 2; + var firstHalf = messages.Take(midpoint).ToList(); + var secondHalf = messages.Skip(midpoint).ToList(); + + await this.ExecuteBatchOperationAsync(firstHalf, timestamp, cancellationToken).ConfigureAwait(false); + await this.ExecuteBatchOperationAsync(secondHalf, timestamp, cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Adds a single message to the store. + /// + private async Task AddSingleMessageAsync(ChatMessage message, CancellationToken cancellationToken) + { + var document = this.CreateMessageDocument(message, DateTimeOffset.UtcNow.ToUnixTimeSeconds()); + + try + { + await this._container.CreateItemAsync(document, this._partitionKey, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.RequestEntityTooLarge) + { + throw new InvalidOperationException( + "Message exceeds Cosmos DB's maximum item size limit of 2MB. " + + "Message ID: " + message.MessageId + ", Serialized size is too large. " + + "Consider reducing message content or splitting into smaller messages.", + ex); + } + } + + /// + /// Creates a message document with enhanced metadata. + /// + private CosmosMessageDocument CreateMessageDocument(ChatMessage message, long timestamp) + { + return new CosmosMessageDocument + { + Id = Guid.NewGuid().ToString(), + ConversationId = this.ConversationId, + Timestamp = timestamp, + MessageId = message.MessageId, + Role = message.Role.Value, + Message = JsonSerializer.Serialize(message, s_defaultJsonOptions), + Type = "ChatMessage", // Type discriminator + Ttl = this.MessageTtlSeconds, // Configurable TTL + // Include hierarchical metadata when using hierarchical partitioning + TenantId = this._useHierarchicalPartitioning ? this._tenantId : null, + UserId = this._useHierarchicalPartitioning ? this._userId : null, + SessionId = this._useHierarchicalPartitioning ? this.ConversationId : null + }; + } + + /// + public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) + { +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var state = new StoreState + { + ConversationIdentifier = this.ConversationId, + TenantId = this._tenantId, + UserId = this._userId, + UseHierarchicalPartitioning = this._useHierarchicalPartitioning + }; + + var options = jsonSerializerOptions ?? s_defaultJsonOptions; + return JsonSerializer.SerializeToElement(state, options); + } + + /// + /// Gets the count of messages in this conversation. + /// This is an additional utility method beyond the base contract. + /// + /// The cancellation token. + /// The number of messages in the conversation. + public async Task GetMessageCountAsync(CancellationToken cancellationToken = default) + { +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + // Efficient count query + var query = new QueryDefinition("SELECT VALUE COUNT(1) FROM c WHERE c.conversationId = @conversationId AND c.Type = @type") + .WithParameter("@conversationId", this.ConversationId) + .WithParameter("@type", "ChatMessage"); + + var iterator = this._container.GetItemQueryIterator(query, requestOptions: new QueryRequestOptions + { + PartitionKey = this._partitionKey + }); + + // COUNT queries always return a result + var response = await iterator.ReadNextAsync(cancellationToken).ConfigureAwait(false); + return response.FirstOrDefault(); + } + + /// + /// Deletes all messages in this conversation. + /// This is an additional utility method beyond the base contract. + /// + /// The cancellation token. + /// The number of messages deleted. + public async Task ClearMessagesAsync(CancellationToken cancellationToken = default) + { +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + // Batch delete for efficiency + var query = new QueryDefinition("SELECT VALUE c.id FROM c WHERE c.conversationId = @conversationId AND c.Type = @type") + .WithParameter("@conversationId", this.ConversationId) + .WithParameter("@type", "ChatMessage"); + + var iterator = this._container.GetItemQueryIterator(query, requestOptions: new QueryRequestOptions + { + PartitionKey = this._partitionKey, + MaxItemCount = this.MaxItemCount + }); + + var deletedCount = 0; + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(cancellationToken).ConfigureAwait(false); + var batch = this._container.CreateTransactionalBatch(this._partitionKey); + var batchItemCount = 0; + + foreach (var itemId in response) + { + if (!string.IsNullOrEmpty(itemId)) + { + batch.DeleteItem(itemId); + batchItemCount++; + deletedCount++; + } + } + + if (batchItemCount > 0) + { + await batch.ExecuteAsync(cancellationToken).ConfigureAwait(false); + } + } + + return deletedCount; + } + + /// + public void Dispose() + { + if (!this._disposed) + { + if (this._ownsClient) + { + this._cosmosClient?.Dispose(); + } + this._disposed = true; + } + } + + private sealed class StoreState + { + public string ConversationIdentifier { get; set; } = string.Empty; + public string? TenantId { get; set; } + public string? UserId { get; set; } + public bool UseHierarchicalPartitioning { get; set; } + } + + /// + /// Represents a document stored in Cosmos DB for chat messages. + /// + [SuppressMessage("Performance", "CA1812:Avoid uninstantiated internal classes", Justification = "Instantiated by Cosmos DB operations")] + private sealed class CosmosMessageDocument + { + [Newtonsoft.Json.JsonProperty("id")] + public string Id { get; set; } = string.Empty; + + [Newtonsoft.Json.JsonProperty("conversationId")] + public string ConversationId { get; set; } = string.Empty; + + [Newtonsoft.Json.JsonProperty("timestamp")] + public long Timestamp { get; set; } + + [Newtonsoft.Json.JsonProperty("messageId")] + public string? MessageId { get; set; } + + [Newtonsoft.Json.JsonProperty("role")] + public string? Role { get; set; } + + [Newtonsoft.Json.JsonProperty("message")] + public string Message { get; set; } = string.Empty; + + [Newtonsoft.Json.JsonProperty("type")] + public string Type { get; set; } = string.Empty; + + [Newtonsoft.Json.JsonProperty("ttl")] + public int? Ttl { get; set; } + + /// + /// Tenant ID for hierarchical partitioning scenarios (optional). + /// + [Newtonsoft.Json.JsonProperty("tenantId")] + public string? TenantId { get; set; } + + /// + /// User ID for hierarchical partitioning scenarios (optional). + /// + [Newtonsoft.Json.JsonProperty("userId")] + public string? UserId { get; set; } + + /// + /// Session ID for hierarchical partitioning scenarios (same as ConversationId for compatibility). + /// + [Newtonsoft.Json.JsonProperty("sessionId")] + public string? SessionId { get; set; } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosCheckpointStore.cs b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosCheckpointStore.cs new file mode 100644 index 0000000000..62987b1dfc --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosCheckpointStore.cs @@ -0,0 +1,279 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Azure.Core; +using Microsoft.Azure.Cosmos; +using Microsoft.Shared.Diagnostics; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Agents.AI.Workflows.Checkpointing; + +/// +/// Provides a Cosmos DB implementation of the abstract class. +/// +/// The type of objects to store as checkpoint values. +[RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] +[RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] +public class CosmosCheckpointStore : JsonCheckpointStore, IDisposable +{ + private readonly CosmosClient _cosmosClient; + private readonly Container _container; + private readonly bool _ownsClient; + private bool _disposed; + + /// + /// Initializes a new instance of the class using a connection string. + /// + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosCheckpointStore(string connectionString, string databaseId, string containerId) + { + var cosmosClientOptions = new CosmosClientOptions(); + + this._cosmosClient = new CosmosClient(Throw.IfNullOrWhitespace(connectionString), cosmosClientOptions); + this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); + this._ownsClient = true; + } + + /// + /// Initializes a new instance of the class using a TokenCredential for authentication. + /// + /// The Cosmos DB account endpoint URI. + /// The TokenCredential to use for authentication (e.g., DefaultAzureCredential, ManagedIdentityCredential). + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosCheckpointStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId) + { + var cosmosClientOptions = new CosmosClientOptions + { + SerializerOptions = new CosmosSerializationOptions + { + PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase + } + }; + + this._cosmosClient = new CosmosClient(Throw.IfNullOrWhitespace(accountEndpoint), Throw.IfNull(tokenCredential), cosmosClientOptions); + this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); + this._ownsClient = true; + } + + /// + /// Initializes a new instance of the class using an existing . + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + public CosmosCheckpointStore(CosmosClient cosmosClient, string databaseId, string containerId) + { + this._cosmosClient = Throw.IfNull(cosmosClient); + + this._container = this._cosmosClient.GetContainer(Throw.IfNullOrWhitespace(databaseId), Throw.IfNullOrWhitespace(containerId)); + this._ownsClient = false; + } + + /// + /// Gets the identifier of the Cosmos DB database. + /// + public string DatabaseId => this._container.Database.Id; + + /// + /// Gets the identifier of the Cosmos DB container. + /// + public string ContainerId => this._container.Id; + + /// + public override async ValueTask CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var checkpointId = Guid.NewGuid().ToString("N"); + var checkpointInfo = new CheckpointInfo(runId, checkpointId); + + var document = new CosmosCheckpointDocument + { + Id = $"{runId}_{checkpointId}", + RunId = runId, + CheckpointId = checkpointId, + Value = JToken.Parse(value.GetRawText()), + ParentCheckpointId = parent?.CheckpointId, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + }; + + await this._container.CreateItemAsync(document, new PartitionKey(runId)).ConfigureAwait(false); + return checkpointInfo; + } + + /// + public override async ValueTask RetrieveCheckpointAsync(string runId, CheckpointInfo key) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + + if (key is null) + { + throw new ArgumentNullException(nameof(key)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var id = $"{runId}_{key.CheckpointId}"; + + try + { + var response = await this._container.ReadItemAsync(id, new PartitionKey(runId)).ConfigureAwait(false); + using var document = JsonDocument.Parse(response.Resource.Value.ToString()); + return document.RootElement.Clone(); + } + catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound) + { + throw new InvalidOperationException($"Checkpoint with ID '{key.CheckpointId}' for run '{runId}' not found."); + } + } + + /// + public override async ValueTask> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + QueryDefinition query = withParent == null + ? new QueryDefinition("SELECT c.runId, c.checkpointId FROM c WHERE c.runId = @runId ORDER BY c.timestamp ASC") + .WithParameter("@runId", runId) + : new QueryDefinition("SELECT c.runId, c.checkpointId FROM c WHERE c.runId = @runId AND c.parentCheckpointId = @parentCheckpointId ORDER BY c.timestamp ASC") + .WithParameter("@runId", runId) + .WithParameter("@parentCheckpointId", withParent.CheckpointId); + + var iterator = this._container.GetItemQueryIterator(query); + var checkpoints = new List(); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync().ConfigureAwait(false); + checkpoints.AddRange(response.Select(r => new CheckpointInfo(r.RunId, r.CheckpointId))); + } + + return checkpoints; + } + + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by the and optionally releases the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!this._disposed) + { + if (disposing && this._ownsClient) + { + this._cosmosClient?.Dispose(); + } + this._disposed = true; + } + } + + /// + /// Represents a checkpoint document stored in Cosmos DB. + /// + internal sealed class CosmosCheckpointDocument + { + [JsonProperty("id")] + public string Id { get; set; } = string.Empty; + + [JsonProperty("runId")] + public string RunId { get; set; } = string.Empty; + + [JsonProperty("checkpointId")] + public string CheckpointId { get; set; } = string.Empty; + + [JsonProperty("value")] + public JToken Value { get; set; } = JValue.CreateNull(); + + [JsonProperty("parentCheckpointId")] + public string? ParentCheckpointId { get; set; } + + [JsonProperty("timestamp")] + public long Timestamp { get; set; } + } + + /// + /// Represents the result of a checkpoint query. + /// + [SuppressMessage("Performance", "CA1812:Avoid uninstantiated internal classes", Justification = "Instantiated by Cosmos DB query deserialization")] + private sealed class CheckpointQueryResult + { + public string RunId { get; set; } = string.Empty; + public string CheckpointId { get; set; } = string.Empty; + } +} + +/// +/// Provides a non-generic Cosmos DB implementation of the abstract class. +/// +[RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] +[RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] +public sealed class CosmosCheckpointStore : CosmosCheckpointStore +{ + /// + public CosmosCheckpointStore(string connectionString, string databaseId, string containerId) + : base(connectionString, databaseId, containerId) + { + } + + /// + public CosmosCheckpointStore(string accountEndpoint, TokenCredential tokenCredential, string databaseId, string containerId) + : base(accountEndpoint, tokenCredential, databaseId, containerId) + { + } + + /// + public CosmosCheckpointStore(CosmosClient cosmosClient, string databaseId, string containerId) + : base(cosmosClient, databaseId, containerId) + { + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBChatExtensions.cs b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBChatExtensions.cs new file mode 100644 index 0000000000..4e3b66fd54 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBChatExtensions.cs @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using Azure.Identity; +using Microsoft.Azure.Cosmos; + +namespace Microsoft.Agents.AI; + +/// +/// Provides extension methods for integrating Cosmos DB chat message storage with the Agent Framework. +/// +public static class CosmosDBChatExtensions +{ + /// + /// Configures the agent to use Cosmos DB for message storage with connection string authentication. + /// + /// The chat client agent options to configure. + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The configured . + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with NativeAOT.")] + public static ChatClientAgentOptions WithCosmosDBMessageStore( + this ChatClientAgentOptions options, + string connectionString, + string databaseId, + string containerId) + { + if (options is null) + { + throw new ArgumentNullException(nameof(options)); + } + + options.ChatMessageStoreFactory = context => new CosmosChatMessageStore(connectionString, databaseId, containerId); + return options; + } + + /// + /// Configures the agent to use Cosmos DB for message storage with managed identity authentication. + /// + /// The chat client agent options to configure. + /// The Cosmos DB account endpoint URI. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The configured . + /// Thrown when is null. + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with NativeAOT.")] + public static ChatClientAgentOptions WithCosmosDBMessageStoreUsingManagedIdentity( + this ChatClientAgentOptions options, + string accountEndpoint, + string databaseId, + string containerId) + { + if (options is null) + { + throw new ArgumentNullException(nameof(options)); + } + + options.ChatMessageStoreFactory = context => new CosmosChatMessageStore(accountEndpoint, new DefaultAzureCredential(), databaseId, containerId); + return options; + } + + /// + /// Configures the agent to use Cosmos DB for message storage with an existing . + /// + /// The chat client agent options to configure. + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// The configured . + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosChatMessageStore uses JSON serialization which is incompatible with NativeAOT.")] + public static ChatClientAgentOptions WithCosmosDBMessageStore( + this ChatClientAgentOptions options, + CosmosClient cosmosClient, + string databaseId, + string containerId) + { + if (options is null) + { + throw new ArgumentNullException(nameof(options)); + } + + options.ChatMessageStoreFactory = context => new CosmosChatMessageStore(cosmosClient, databaseId, containerId); + return options; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBWorkflowExtensions.cs b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBWorkflowExtensions.cs new file mode 100644 index 0000000000..9d8bc52e68 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/CosmosDBWorkflowExtensions.cs @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using Azure.Identity; +using Microsoft.Agents.AI.Workflows.Checkpointing; +using Microsoft.Azure.Cosmos; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides extension methods for integrating Cosmos DB checkpoint storage with the Agent Framework. +/// +public static class CosmosDBWorkflowExtensions +{ + /// + /// Creates a Cosmos DB checkpoint store using connection string authentication. + /// + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStore( + string connectionString, + string databaseId, + string containerId) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(connectionString)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(connectionString, databaseId, containerId); + } + + /// + /// Creates a Cosmos DB checkpoint store using managed identity authentication. + /// + /// The Cosmos DB account endpoint URI. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStoreUsingManagedIdentity( + string accountEndpoint, + string databaseId, + string containerId) + { + if (string.IsNullOrWhiteSpace(accountEndpoint)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(accountEndpoint)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(accountEndpoint, new DefaultAzureCredential(), databaseId, containerId); + } + + /// + /// Creates a Cosmos DB checkpoint store using an existing . + /// + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStore( + CosmosClient cosmosClient, + string databaseId, + string containerId) + { + if (cosmosClient is null) + { + throw new ArgumentNullException(nameof(cosmosClient)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(cosmosClient, databaseId, containerId); + } + + /// + /// Creates a generic Cosmos DB checkpoint store using connection string authentication. + /// + /// The type of objects to store as checkpoint values. + /// The Cosmos DB connection string. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStore( + string connectionString, + string databaseId, + string containerId) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(connectionString)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(connectionString, databaseId, containerId); + } + + /// + /// Creates a generic Cosmos DB checkpoint store using managed identity authentication. + /// + /// The type of objects to store as checkpoint values. + /// The Cosmos DB account endpoint URI. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStoreUsingManagedIdentity( + string accountEndpoint, + string databaseId, + string containerId) + { + if (string.IsNullOrWhiteSpace(accountEndpoint)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(accountEndpoint)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(accountEndpoint, new DefaultAzureCredential(), databaseId, containerId); + } + + /// + /// Creates a generic Cosmos DB checkpoint store using an existing . + /// + /// The type of objects to store as checkpoint values. + /// The instance to use for Cosmos DB operations. + /// The identifier of the Cosmos DB database. + /// The identifier of the Cosmos DB container. + /// A new instance of . + /// Thrown when any required parameter is null. + /// Thrown when any string parameter is null or whitespace. + [RequiresUnreferencedCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The CosmosCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static CosmosCheckpointStore CreateCheckpointStore( + CosmosClient cosmosClient, + string databaseId, + string containerId) + { + if (cosmosClient is null) + { + throw new ArgumentNullException(nameof(cosmosClient)); + } + + if (string.IsNullOrWhiteSpace(databaseId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(databaseId)); + } + + if (string.IsNullOrWhiteSpace(containerId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(containerId)); + } + + return new CosmosCheckpointStore(cosmosClient, databaseId, containerId); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/Microsoft.Agents.AI.CosmosNoSql.csproj b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/Microsoft.Agents.AI.CosmosNoSql.csproj new file mode 100644 index 0000000000..7e13ec5998 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.CosmosNoSql/Microsoft.Agents.AI.CosmosNoSql.csproj @@ -0,0 +1,41 @@ + + + + $(TargetFrameworksCore) + Microsoft.Agents.AI + $(NoWarn);MEAI001 + preview + + + + true + true + true + true + true + true + + + + + + + Microsoft Agent Framework Cosmos DB NoSQL Integration + Provides Cosmos DB NoSQL implementations for Microsoft Agent Framework storage abstractions including ChatMessageStore and CheckpointStore. + + + + + + + + + + + + + + + + + diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/.editorconfig b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/.editorconfig new file mode 100644 index 0000000000..83e05f582a --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/.editorconfig @@ -0,0 +1,9 @@ +# EditorConfig overrides for Cosmos DB Unit Tests +# Multi-targeting (net472 + net9.0) causes false positives for IDE0005 (unnecessary using directives) + +root = false + +[*.cs] +# Suppress IDE0005 for this project - multi-targeting causes false positives +# These using directives ARE necessary but appear unnecessary in one target framework +dotnet_diagnostic.IDE0005.severity = none diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosChatMessageStoreTests.cs b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosChatMessageStoreTests.cs new file mode 100644 index 0000000000..6f2a256206 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosChatMessageStoreTests.cs @@ -0,0 +1,760 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization.Metadata; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Identity; +using Microsoft.Agents.AI; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.AI; +using Xunit; + +namespace Microsoft.Agents.AI.CosmosNoSql.UnitTests; + +/// +/// Contains tests for . +/// +/// Test Modes: +/// - Default Mode: Cleans up all test data after each test run (deletes database) +/// - Preserve Mode: Keeps containers and data for inspection in Cosmos DB Emulator Data Explorer +/// +/// To enable Preserve Mode, set environment variable: COSMOS_PRESERVE_CONTAINERS=true +/// Example: $env:COSMOS_PRESERVE_CONTAINERS="true"; dotnet test +/// +/// In Preserve Mode, you can view the data in Cosmos DB Emulator Data Explorer at: +/// https://localhost:8081/_explorer/index.html +/// Database: AgentFrameworkTests +/// Container: ChatMessages +/// +/// Environment Variable Reference: +/// | Variable | Values | Description | +/// |----------|--------|-------------| +/// | COSMOS_PRESERVE_CONTAINERS | true / false | Controls whether to preserve test data after completion | +/// +/// Usage Examples: +/// - Run all tests in preserve mode: $env:COSMOS_PRESERVE_CONTAINERS="true"; dotnet test tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/ +/// - Run specific test category in preserve mode: $env:COSMOS_PRESERVE_CONTAINERS="true"; dotnet test tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/ --filter "Category=CosmosDB" +/// - Reset to cleanup mode: $env:COSMOS_PRESERVE_CONTAINERS=""; dotnet test tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/ +/// +[Collection("CosmosDB")] +public sealed class CosmosChatMessageStoreTests : IAsyncLifetime, IDisposable +{ + // Cosmos DB Emulator connection settings + private const string EmulatorEndpoint = "https://localhost:8081"; + private const string EmulatorKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + private const string TestContainerId = "ChatMessages"; + private const string HierarchicalTestContainerId = "HierarchicalChatMessages"; + // Use unique database ID per test class instance to avoid conflicts +#pragma warning disable CA1802 // Use literals where appropriate + private static readonly string s_testDatabaseId = $"AgentFrameworkTests-ChatStore-{Guid.NewGuid():N}"; +#pragma warning restore CA1802 + + private string _connectionString = string.Empty; + private bool _emulatorAvailable; + private bool _preserveContainer; + private CosmosClient? _setupClient; // Only used for test setup/cleanup + + public async Task InitializeAsync() + { + // Check environment variable to determine if we should preserve containers + // Set COSMOS_PRESERVE_CONTAINERS=true to keep containers and data for inspection + this._preserveContainer = string.Equals(Environment.GetEnvironmentVariable("COSMOS_PRESERVE_CONTAINERS"), "true", StringComparison.OrdinalIgnoreCase); + + this._connectionString = $"AccountEndpoint={EmulatorEndpoint};AccountKey={EmulatorKey}"; + + try + { + // Only create CosmosClient for test setup - the actual tests will use connection string constructors + this._setupClient = new CosmosClient(EmulatorEndpoint, EmulatorKey); + + // Test connection by attempting to create database + var databaseResponse = await this._setupClient.CreateDatabaseIfNotExistsAsync(s_testDatabaseId); + + // Create container for simple partitioning tests + await databaseResponse.Database.CreateContainerIfNotExistsAsync( + TestContainerId, + "/conversationId", + throughput: 400); + + // Create container for hierarchical partitioning tests with hierarchical partition key + var hierarchicalContainerProperties = new ContainerProperties(HierarchicalTestContainerId, new List { "/tenantId", "/userId", "/sessionId" }); + await databaseResponse.Database.CreateContainerIfNotExistsAsync( + hierarchicalContainerProperties, + throughput: 400); + + this._emulatorAvailable = true; + } + catch (Exception) + { + // Emulator not available, tests will be skipped + this._emulatorAvailable = false; + this._setupClient?.Dispose(); + this._setupClient = null; + } + } + + public async Task DisposeAsync() + { + if (this._setupClient != null && this._emulatorAvailable) + { + try + { + if (this._preserveContainer) + { + // Preserve mode: Don't delete the database/container, keep data for inspection + // This allows viewing data in the Cosmos DB Emulator Data Explorer + // No cleanup needed - data persists for debugging + } + else + { + // Clean mode: Delete the test database and all data + var database = this._setupClient.GetDatabase(s_testDatabaseId); + await database.DeleteAsync(); + } + } + catch (Exception ex) + { + // Ignore cleanup errors during test teardown + Console.WriteLine($"Warning: Cleanup failed: {ex.Message}"); + } + finally + { + this._setupClient.Dispose(); + } + } + } + + public void Dispose() + { + this._setupClient?.Dispose(); + GC.SuppressFinalize(this); + } + + private void SkipIfEmulatorNotAvailable() + { + // In CI: Skip if COSMOS_EMULATOR_AVAILABLE is not set to "true" + // Locally: Skip if emulator connection check failed + var ciEmulatorAvailable = string.Equals(Environment.GetEnvironmentVariable("COSMOS_EMULATOR_AVAILABLE"), "true", StringComparison.OrdinalIgnoreCase); + + Xunit.Skip.If(!ciEmulatorAvailable && !this._emulatorAvailable, "Cosmos DB Emulator is not available"); + } + + #region Constructor Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithConnectionString_ShouldCreateInstance() + { + // Arrange & Act + this.SkipIfEmulatorNotAvailable(); + + // Act + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, "test-conversation"); + + // Assert + Assert.NotNull(store); + Assert.Equal("test-conversation", store.ConversationId); + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(TestContainerId, store.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithConnectionStringNoConversationId_ShouldCreateInstance() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + + // Act + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId); + + // Assert + Assert.NotNull(store); + Assert.NotNull(store.ConversationId); + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(TestContainerId, store.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithNullConnectionString_ShouldThrowArgumentException() + { + // Arrange & Act & Assert + Assert.Throws(() => + new CosmosChatMessageStore((string)null!, s_testDatabaseId, TestContainerId, "test-conversation")); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithEmptyConversationId_ShouldThrowArgumentException() + { + // Arrange & Act & Assert + this.SkipIfEmulatorNotAvailable(); + + Assert.Throws(() => + new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, "")); + } + + #endregion + + #region AddMessagesAsync Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task AddMessagesAsync_WithSingleMessage_ShouldAddMessageAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var conversationId = Guid.NewGuid().ToString(); + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversationId); + var message = new ChatMessage(ChatRole.User, "Hello, world!"); + + // Act + await store.AddMessagesAsync([message]); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Assert + var messages = await store.GetMessagesAsync(); + var messageList = messages.ToList(); + + // Simple assertion - if this fails, we know the deserialization is the issue + if (messageList.Count == 0) + { + // Let's check if we can find ANY items in the container for this conversation + var directQuery = new QueryDefinition("SELECT VALUE COUNT(1) FROM c WHERE c.conversationId = @conversationId") + .WithParameter("@conversationId", conversationId); + var countIterator = this._setupClient!.GetDatabase(s_testDatabaseId).GetContainer(TestContainerId) + .GetItemQueryIterator(directQuery, requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(conversationId) + }); + + var countResponse = await countIterator.ReadNextAsync(); + var count = countResponse.FirstOrDefault(); + + // Debug: Let's see what the raw query returns + var rawQuery = new QueryDefinition("SELECT * FROM c WHERE c.conversationId = @conversationId") + .WithParameter("@conversationId", conversationId); + var rawIterator = this._setupClient!.GetDatabase(s_testDatabaseId).GetContainer(TestContainerId) + .GetItemQueryIterator(rawQuery, requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(conversationId) + }); + + List rawResults = new(); + while (rawIterator.HasMoreResults) + { + var rawResponse = await rawIterator.ReadNextAsync(); + rawResults.AddRange(rawResponse); + } + + string rawJson = rawResults.Count > 0 ? Newtonsoft.Json.JsonConvert.SerializeObject(rawResults[0], Newtonsoft.Json.Formatting.Indented) : "null"; + Assert.Fail($"GetMessagesAsync returned 0 messages, but direct count query found {count} items for conversation {conversationId}. Raw document: {rawJson}"); + } + + Assert.Single(messageList); + Assert.Equal("Hello, world!", messageList[0].Text); + Assert.Equal(ChatRole.User, messageList[0].Role); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task AddMessagesAsync_WithMultipleMessages_ShouldAddAllMessagesAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var conversationId = Guid.NewGuid().ToString(); + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversationId); + var messages = new[] + { + new ChatMessage(ChatRole.User, "First message"), + new ChatMessage(ChatRole.Assistant, "Second message"), + new ChatMessage(ChatRole.User, "Third message") + }; + + // Act + await store.AddMessagesAsync(messages); + + // Assert + var retrievedMessages = await store.GetMessagesAsync(); + var messageList = retrievedMessages.ToList(); + Assert.Equal(3, messageList.Count); + Assert.Equal("First message", messageList[0].Text); + Assert.Equal("Second message", messageList[1].Text); + Assert.Equal("Third message", messageList[2].Text); + } + + #endregion + + #region GetMessagesAsync Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task GetMessagesAsync_WithNoMessages_ShouldReturnEmptyAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, Guid.NewGuid().ToString()); + + // Act + var messages = await store.GetMessagesAsync(); + + // Assert + Assert.Empty(messages); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task GetMessagesAsync_WithConversationIsolation_ShouldOnlyReturnMessagesForConversationAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var conversation1 = Guid.NewGuid().ToString(); + var conversation2 = Guid.NewGuid().ToString(); + + using var store1 = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversation1); + using var store2 = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversation2); + + await store1.AddMessagesAsync([new ChatMessage(ChatRole.User, "Message for conversation 1")]); + await store2.AddMessagesAsync([new ChatMessage(ChatRole.User, "Message for conversation 2")]); + + // Act + var messages1 = await store1.GetMessagesAsync(); + var messages2 = await store2.GetMessagesAsync(); + + // Assert + var messageList1 = messages1.ToList(); + var messageList2 = messages2.ToList(); + Assert.Single(messageList1); + Assert.Single(messageList2); + Assert.Equal("Message for conversation 1", messageList1[0].Text); + Assert.Equal("Message for conversation 2", messageList2[0].Text); + } + + #endregion + + #region Integration Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task FullWorkflow_AddAndGet_ShouldWorkCorrectlyAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var conversationId = $"test-conversation-{Guid.NewGuid():N}"; // Use unique conversation ID + using var originalStore = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversationId); + + var messages = new[] + { + new ChatMessage(ChatRole.System, "You are a helpful assistant."), + new ChatMessage(ChatRole.User, "Hello!"), + new ChatMessage(ChatRole.Assistant, "Hi there! How can I help you today?"), + new ChatMessage(ChatRole.User, "What's the weather like?"), + new ChatMessage(ChatRole.Assistant, "I'm sorry, I don't have access to current weather data.") + }; + + // Act 1: Add messages + await originalStore.AddMessagesAsync(messages); + + // Act 2: Verify messages were added + var retrievedMessages = await originalStore.GetMessagesAsync(); + var retrievedList = retrievedMessages.ToList(); + Assert.Equal(5, retrievedList.Count); + + // Act 3: Create new store instance for same conversation (test persistence) + using var newStore = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, conversationId); + var persistedMessages = await newStore.GetMessagesAsync(); + var persistedList = persistedMessages.ToList(); + + // Assert final state + Assert.Equal(5, persistedList.Count); + Assert.Equal("You are a helpful assistant.", persistedList[0].Text); + Assert.Equal("Hello!", persistedList[1].Text); + Assert.Equal("Hi there! How can I help you today?", persistedList[2].Text); + Assert.Equal("What's the weather like?", persistedList[3].Text); + Assert.Equal("I'm sorry, I don't have access to current weather data.", persistedList[4].Text); + } + + #endregion + + #region Disposal Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Dispose_AfterUse_ShouldNotThrow() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, Guid.NewGuid().ToString()); + + // Act & Assert + store.Dispose(); // Should not throw + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Dispose_MultipleCalls_ShouldNotThrow() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, Guid.NewGuid().ToString()); + + // Act & Assert + store.Dispose(); // First call + store.Dispose(); // Second call - should not throw + } + + #endregion + + #region Hierarchical Partitioning Tests + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalConnectionString_ShouldCreateInstance() + { + // Arrange & Act + this.SkipIfEmulatorNotAvailable(); + + // Act + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, "tenant-123", "user-456", "session-789"); + + // Assert + Assert.NotNull(store); + Assert.Equal("session-789", store.ConversationId); + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(HierarchicalTestContainerId, store.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalEndpoint_ShouldCreateInstance() + { + // Arrange & Act + this.SkipIfEmulatorNotAvailable(); + + // Act + TokenCredential credential = new DefaultAzureCredential(); + using var store = new CosmosChatMessageStore(EmulatorEndpoint, credential, s_testDatabaseId, HierarchicalTestContainerId, "tenant-123", "user-456", "session-789"); + + // Assert + Assert.NotNull(store); + Assert.Equal("session-789", store.ConversationId); + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(HierarchicalTestContainerId, store.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalCosmosClient_ShouldCreateInstance() + { + // Arrange & Act + this.SkipIfEmulatorNotAvailable(); + + using var cosmosClient = new CosmosClient(EmulatorEndpoint, EmulatorKey); + using var store = new CosmosChatMessageStore(cosmosClient, s_testDatabaseId, HierarchicalTestContainerId, "tenant-123", "user-456", "session-789"); + + // Assert + Assert.NotNull(store); + Assert.Equal("session-789", store.ConversationId); + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(HierarchicalTestContainerId, store.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalNullTenantId_ShouldThrowArgumentException() + { + // Arrange & Act & Assert + this.SkipIfEmulatorNotAvailable(); + + Assert.Throws(() => + new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, null!, "user-456", "session-789")); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalEmptyUserId_ShouldThrowArgumentException() + { + // Arrange & Act & Assert + this.SkipIfEmulatorNotAvailable(); + + Assert.Throws(() => + new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, "tenant-123", "", "session-789")); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public void Constructor_WithHierarchicalWhitespaceSessionId_ShouldThrowArgumentException() + { + // Arrange & Act & Assert + this.SkipIfEmulatorNotAvailable(); + + Assert.Throws(() => + new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, "tenant-123", "user-456", " ")); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task AddMessagesAsync_WithHierarchicalPartitioning_ShouldAddMessageWithMetadataAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string TenantId = "tenant-123"; + const string UserId = "user-456"; + const string SessionId = "session-789"; + // Test hierarchical partitioning constructor with connection string + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, TenantId, UserId, SessionId); + var message = new ChatMessage(ChatRole.User, "Hello from hierarchical partitioning!"); + + // Act + await store.AddMessagesAsync([message]); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Assert + var messages = await store.GetMessagesAsync(); + var messageList = messages.ToList(); + + Assert.Single(messageList); + Assert.Equal("Hello from hierarchical partitioning!", messageList[0].Text); + Assert.Equal(ChatRole.User, messageList[0].Role); + + // Verify that the document is stored with hierarchical partitioning metadata + var directQuery = new QueryDefinition("SELECT * FROM c WHERE c.conversationId = @conversationId AND c.type = @type") + .WithParameter("@conversationId", SessionId) + .WithParameter("@type", "ChatMessage"); + + var iterator = this._setupClient!.GetDatabase(s_testDatabaseId).GetContainer(HierarchicalTestContainerId) + .GetItemQueryIterator(directQuery, requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKeyBuilder().Add(TenantId).Add(UserId).Add(SessionId).Build() + }); + + var response = await iterator.ReadNextAsync(); + var document = response.FirstOrDefault(); + + Assert.NotNull(document); + // The document should have hierarchical metadata + Assert.Equal(SessionId, (string)document!.conversationId); + Assert.Equal(TenantId, (string)document!.tenantId); + Assert.Equal(UserId, (string)document!.userId); + Assert.Equal(SessionId, (string)document!.sessionId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task AddMessagesAsync_WithHierarchicalMultipleMessages_ShouldAddAllMessagesAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string TenantId = "tenant-batch"; + const string UserId = "user-batch"; + const string SessionId = "session-batch"; + // Test hierarchical partitioning constructor with connection string + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, TenantId, UserId, SessionId); + var messages = new[] + { + new ChatMessage(ChatRole.User, "First hierarchical message"), + new ChatMessage(ChatRole.Assistant, "Second hierarchical message"), + new ChatMessage(ChatRole.User, "Third hierarchical message") + }; + + // Act + await store.AddMessagesAsync(messages); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Assert + var retrievedMessages = await store.GetMessagesAsync(); + var messageList = retrievedMessages.ToList(); + + Assert.Equal(3, messageList.Count); + Assert.Equal("First hierarchical message", messageList[0].Text); + Assert.Equal("Second hierarchical message", messageList[1].Text); + Assert.Equal("Third hierarchical message", messageList[2].Text); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task GetMessagesAsync_WithHierarchicalPartitionIsolation_ShouldIsolateMessagesByUserIdAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string TenantId = "tenant-isolation"; + const string UserId1 = "user-1"; + const string UserId2 = "user-2"; + const string SessionId = "session-isolation"; + + // Different userIds create different hierarchical partitions, providing proper isolation + using var store1 = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, TenantId, UserId1, SessionId); + using var store2 = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, TenantId, UserId2, SessionId); + + // Add messages to both stores + await store1.AddMessagesAsync([new ChatMessage(ChatRole.User, "Message from user 1")]); + await store2.AddMessagesAsync([new ChatMessage(ChatRole.User, "Message from user 2")]); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Act & Assert + var messages1 = await store1.GetMessagesAsync(); + var messageList1 = messages1.ToList(); + + var messages2 = await store2.GetMessagesAsync(); + var messageList2 = messages2.ToList(); + + // With true hierarchical partitioning, each user sees only their own messages + Assert.Single(messageList1); + Assert.Single(messageList2); + Assert.Equal("Message from user 1", messageList1[0].Text); + Assert.Equal("Message from user 2", messageList2[0].Text); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task SerializeDeserialize_WithHierarchicalPartitioning_ShouldPreserveStateAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string TenantId = "tenant-serialize"; + const string UserId = "user-serialize"; + const string SessionId = "session-serialize"; + + using var originalStore = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, TenantId, UserId, SessionId); + await originalStore.AddMessagesAsync([new ChatMessage(ChatRole.User, "Test serialization message")]); + + // Act - Serialize the store state + var serializedState = originalStore.Serialize(); + + // Create a new store from the serialized state + using var cosmosClient = new CosmosClient(EmulatorEndpoint, EmulatorKey); + var serializerOptions = new JsonSerializerOptions + { + TypeInfoResolver = new DefaultJsonTypeInfoResolver() + }; + using var deserializedStore = CosmosChatMessageStore.CreateFromSerializedState(cosmosClient, serializedState, s_testDatabaseId, HierarchicalTestContainerId, serializerOptions); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Assert - The deserialized store should have the same functionality + var messages = await deserializedStore.GetMessagesAsync(); + var messageList = messages.ToList(); + + Assert.Single(messageList); + Assert.Equal("Test serialization message", messageList[0].Text); + Assert.Equal(SessionId, deserializedStore.ConversationId); + Assert.Equal(s_testDatabaseId, deserializedStore.DatabaseId); + Assert.Equal(HierarchicalTestContainerId, deserializedStore.ContainerId); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task HierarchicalAndSimplePartitioning_ShouldCoexistAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string SessionId = "coexist-session"; + + // Create simple store using simple partitioning container and hierarchical store using hierarchical container + using var simpleStore = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, SessionId); + using var hierarchicalStore = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, HierarchicalTestContainerId, "tenant-coexist", "user-coexist", SessionId); + + // Add messages to both + await simpleStore.AddMessagesAsync([new ChatMessage(ChatRole.User, "Simple partitioning message")]); + await hierarchicalStore.AddMessagesAsync([new ChatMessage(ChatRole.User, "Hierarchical partitioning message")]); + + // Wait a moment for eventual consistency + await Task.Delay(100); + + // Act & Assert + var simpleMessages = await simpleStore.GetMessagesAsync(); + var simpleMessageList = simpleMessages.ToList(); + + var hierarchicalMessages = await hierarchicalStore.GetMessagesAsync(); + var hierarchicalMessageList = hierarchicalMessages.ToList(); + + // Each should only see its own messages since they use different containers + Assert.Single(simpleMessageList); + Assert.Single(hierarchicalMessageList); + Assert.Equal("Simple partitioning message", simpleMessageList[0].Text); + Assert.Equal("Hierarchical partitioning message", hierarchicalMessageList[0].Text); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task MaxMessagesToRetrieve_ShouldLimitAndReturnMostRecentAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string ConversationId = "max-messages-test"; + + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, ConversationId); + + // Add 10 messages + var messages = new List(); + for (int i = 1; i <= 10; i++) + { + messages.Add(new ChatMessage(ChatRole.User, $"Message {i}")); + await Task.Delay(10); // Small delay to ensure different timestamps + } + await store.AddMessagesAsync(messages); + + // Wait for eventual consistency + await Task.Delay(100); + + // Act - Set max to 5 and retrieve + store.MaxMessagesToRetrieve = 5; + var retrievedMessages = await store.GetMessagesAsync(); + var messageList = retrievedMessages.ToList(); + + // Assert - Should get the 5 most recent messages (6-10) in ascending order + Assert.Equal(5, messageList.Count); + Assert.Equal("Message 6", messageList[0].Text); + Assert.Equal("Message 7", messageList[1].Text); + Assert.Equal("Message 8", messageList[2].Text); + Assert.Equal("Message 9", messageList[3].Text); + Assert.Equal("Message 10", messageList[4].Text); + } + + [SkippableFact] + [Trait("Category", "CosmosDB")] + public async Task MaxMessagesToRetrieve_Null_ShouldReturnAllMessagesAsync() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + const string ConversationId = "max-messages-null-test"; + + using var store = new CosmosChatMessageStore(this._connectionString, s_testDatabaseId, TestContainerId, ConversationId); + + // Add 10 messages + var messages = new List(); + for (int i = 1; i <= 10; i++) + { + messages.Add(new ChatMessage(ChatRole.User, $"Message {i}")); + } + await store.AddMessagesAsync(messages); + + // Wait for eventual consistency + await Task.Delay(100); + + // Act - No limit set (default null) + var retrievedMessages = await store.GetMessagesAsync(); + var messageList = retrievedMessages.ToList(); + + // Assert - Should get all 10 messages + Assert.Equal(10, messageList.Count); + Assert.Equal("Message 1", messageList[0].Text); + Assert.Equal("Message 10", messageList[9].Text); + } + + #endregion +} diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosCheckpointStoreTests.cs b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosCheckpointStoreTests.cs new file mode 100644 index 0000000000..dfa1f14221 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosCheckpointStoreTests.cs @@ -0,0 +1,454 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Agents.AI.Workflows.Checkpointing; +using Microsoft.Azure.Cosmos; +using Xunit; + +namespace Microsoft.Agents.AI.CosmosNoSql.UnitTests; + +/// +/// Contains tests for . +/// +/// Test Modes: +/// - Default Mode: Cleans up all test data after each test run (deletes database) +/// - Preserve Mode: Keeps containers and data for inspection in Cosmos DB Emulator Data Explorer +/// +/// To enable Preserve Mode, set environment variable: COSMOS_PRESERVE_CONTAINERS=true +/// Example: $env:COSMOS_PRESERVE_CONTAINERS="true"; dotnet test +/// +/// In Preserve Mode, you can view the data in Cosmos DB Emulator Data Explorer at: +/// https://localhost:8081/_explorer/index.html +/// Database: AgentFrameworkTests +/// Container: Checkpoints +/// +[Collection("CosmosDB")] +public class CosmosCheckpointStoreTests : IAsyncLifetime, IDisposable +{ + // Cosmos DB Emulator connection settings + private const string EmulatorEndpoint = "https://localhost:8081"; + private const string EmulatorKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + private const string TestContainerId = "Checkpoints"; + // Use unique database ID per test class instance to avoid conflicts +#pragma warning disable CA1802 // Use literals where appropriate + private static readonly string s_testDatabaseId = $"AgentFrameworkTests-CheckpointStore-{Guid.NewGuid():N}"; +#pragma warning restore CA1802 + + private string _connectionString = string.Empty; + private CosmosClient? _cosmosClient; + private Database? _database; + private bool _emulatorAvailable; + private bool _preserveContainer; + + // JsonSerializerOptions configured for .NET 9+ compatibility + private static readonly JsonSerializerOptions s_jsonOptions = CreateJsonOptions(); + + private static JsonSerializerOptions CreateJsonOptions() + { + var options = new JsonSerializerOptions(); +#if NET9_0_OR_GREATER + options.TypeInfoResolver = new System.Text.Json.Serialization.Metadata.DefaultJsonTypeInfoResolver(); +#endif + return options; + } + + public async Task InitializeAsync() + { + // Check environment variable to determine if we should preserve containers + // Set COSMOS_PRESERVE_CONTAINERS=true to keep containers and data for inspection + this._preserveContainer = string.Equals(Environment.GetEnvironmentVariable("COSMOS_PRESERVE_CONTAINERS"), "true", StringComparison.OrdinalIgnoreCase); + + this._connectionString = $"AccountEndpoint={EmulatorEndpoint};AccountKey={EmulatorKey}"; + + try + { + this._cosmosClient = new CosmosClient(EmulatorEndpoint, EmulatorKey); + + // Test connection by attempting to create database + this._database = await this._cosmosClient.CreateDatabaseIfNotExistsAsync(s_testDatabaseId); + await this._database.CreateContainerIfNotExistsAsync( + TestContainerId, + "/runId", + throughput: 400); + + this._emulatorAvailable = true; + } + catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException || ex is AccessViolationException)) + { + // Emulator not available, tests will be skipped + this._emulatorAvailable = false; + this._cosmosClient?.Dispose(); + this._cosmosClient = null; + } + } + + public async Task DisposeAsync() + { + if (this._cosmosClient != null && this._emulatorAvailable) + { + try + { + if (this._preserveContainer) + { + // Preserve mode: Don't delete the database/container, keep data for inspection + // This allows viewing data in the Cosmos DB Emulator Data Explorer + // No cleanup needed - data persists for debugging + } + else + { + // Clean mode: Delete the test database and all data + await this._database!.DeleteAsync(); + } + } + catch (Exception ex) + { + // Ignore cleanup errors, but log for diagnostics + Console.WriteLine($"[DisposeAsync] Cleanup error: {ex.Message}\n{ex.StackTrace}"); + } + finally + { + this._cosmosClient.Dispose(); + } + } + } + + private void SkipIfEmulatorNotAvailable() + { + // In CI: Skip if COSMOS_EMULATOR_AVAILABLE is not set to "true" + // Locally: Skip if emulator connection check failed + var ciEmulatorAvailable = string.Equals(Environment.GetEnvironmentVariable("COSMOS_EMULATOR_AVAILABLE"), "true", StringComparison.OrdinalIgnoreCase); + + Xunit.Skip.If(!ciEmulatorAvailable && !this._emulatorAvailable, "Cosmos DB Emulator is not available"); + } + + #region Constructor Tests + + [SkippableFact] + public void Constructor_WithCosmosClient_SetsProperties() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + + // Act + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + + // Assert + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(TestContainerId, store.ContainerId); + } + + [SkippableFact] + public void Constructor_WithConnectionString_SetsProperties() + { + // Arrange + this.SkipIfEmulatorNotAvailable(); + + // Act + using var store = new CosmosCheckpointStore(this._connectionString, s_testDatabaseId, TestContainerId); + + // Assert + Assert.Equal(s_testDatabaseId, store.DatabaseId); + Assert.Equal(TestContainerId, store.ContainerId); + } + + [SkippableFact] + public void Constructor_WithNullCosmosClient_ThrowsArgumentNullException() + { + // Act & Assert + Assert.Throws(() => + new CosmosCheckpointStore((CosmosClient)null!, s_testDatabaseId, TestContainerId)); + } + + [SkippableFact] + public void Constructor_WithNullConnectionString_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => + new CosmosCheckpointStore((string)null!, s_testDatabaseId, TestContainerId)); + } + + #endregion + + #region Checkpoint Operations Tests + + [SkippableFact] + public async Task CreateCheckpointAsync_NewCheckpoint_CreatesSuccessfullyAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test checkpoint" }, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Assert + Assert.NotNull(checkpointInfo); + Assert.Equal(runId, checkpointInfo.RunId); + Assert.NotNull(checkpointInfo.CheckpointId); + Assert.NotEmpty(checkpointInfo.CheckpointId); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_ExistingCheckpoint_ReturnsCorrectValueAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var originalData = new { message = "Hello, World!", timestamp = DateTimeOffset.UtcNow }; + var checkpointValue = JsonSerializer.SerializeToElement(originalData, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + var retrievedValue = await store.RetrieveCheckpointAsync(runId, checkpointInfo); + + // Assert + Assert.Equal(JsonValueKind.Object, retrievedValue.ValueKind); + Assert.True(retrievedValue.TryGetProperty("message", out var messageProp)); + Assert.Equal("Hello, World!", messageProp.GetString()); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_NonExistentCheckpoint_ThrowsInvalidOperationExceptionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var fakeCheckpointInfo = new CheckpointInfo(runId, "nonexistent-checkpoint"); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveCheckpointAsync(runId, fakeCheckpointInfo).AsTask()); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_EmptyStore_ReturnsEmptyCollectionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + + // Act + var index = await store.RetrieveIndexAsync(runId); + + // Assert + Assert.NotNull(index); + Assert.Empty(index); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_WithCheckpoints_ReturnsAllCheckpointsAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Create multiple checkpoints + var checkpoint1 = await store.CreateCheckpointAsync(runId, checkpointValue); + var checkpoint2 = await store.CreateCheckpointAsync(runId, checkpointValue); + var checkpoint3 = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Act + var index = (await store.RetrieveIndexAsync(runId)).ToList(); + + // Assert + Assert.Equal(3, index.Count); + Assert.Contains(index, c => c.CheckpointId == checkpoint1.CheckpointId); + Assert.Contains(index, c => c.CheckpointId == checkpoint2.CheckpointId); + Assert.Contains(index, c => c.CheckpointId == checkpoint3.CheckpointId); + } + + [SkippableFact] + public async Task CreateCheckpointAsync_WithParent_CreatesHierarchyAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var parentCheckpoint = await store.CreateCheckpointAsync(runId, checkpointValue); + var childCheckpoint = await store.CreateCheckpointAsync(runId, checkpointValue, parentCheckpoint); + + // Assert + Assert.NotEqual(parentCheckpoint.CheckpointId, childCheckpoint.CheckpointId); + Assert.Equal(runId, parentCheckpoint.RunId); + Assert.Equal(runId, childCheckpoint.RunId); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_WithParentFilter_ReturnsFilteredResultsAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Create parent and child checkpoints + var parent = await store.CreateCheckpointAsync(runId, checkpointValue); + var child1 = await store.CreateCheckpointAsync(runId, checkpointValue, parent); + var child2 = await store.CreateCheckpointAsync(runId, checkpointValue, parent); + + // Create an orphan checkpoint + var orphan = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Act + var allCheckpoints = (await store.RetrieveIndexAsync(runId)).ToList(); + var childrenOfParent = (await store.RetrieveIndexAsync(runId, parent)).ToList(); + + // Assert + Assert.Equal(4, allCheckpoints.Count); // parent + 2 children + orphan + Assert.Equal(2, childrenOfParent.Count); // only children + + Assert.Contains(childrenOfParent, c => c.CheckpointId == child1.CheckpointId); + Assert.Contains(childrenOfParent, c => c.CheckpointId == child2.CheckpointId); + Assert.DoesNotContain(childrenOfParent, c => c.CheckpointId == parent.CheckpointId); + Assert.DoesNotContain(childrenOfParent, c => c.CheckpointId == orphan.CheckpointId); + } + + #endregion + + #region Run Isolation Tests + + [SkippableFact] + public async Task CheckpointOperations_DifferentRuns_IsolatesDataAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId1 = Guid.NewGuid().ToString(); + var runId2 = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var checkpoint1 = await store.CreateCheckpointAsync(runId1, checkpointValue); + var checkpoint2 = await store.CreateCheckpointAsync(runId2, checkpointValue); + + var index1 = (await store.RetrieveIndexAsync(runId1)).ToList(); + var index2 = (await store.RetrieveIndexAsync(runId2)).ToList(); + + // Assert + Assert.Single(index1); + Assert.Single(index2); + Assert.Equal(checkpoint1.CheckpointId, index1[0].CheckpointId); + Assert.Equal(checkpoint2.CheckpointId, index2[0].CheckpointId); + Assert.NotEqual(checkpoint1.CheckpointId, checkpoint2.CheckpointId); + } + + #endregion + + #region Error Handling Tests + + [SkippableFact] + public async Task CreateCheckpointAsync_WithNullRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync(null!, checkpointValue).AsTask()); + } + + [SkippableFact] + public async Task CreateCheckpointAsync_WithEmptyRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync("", checkpointValue).AsTask()); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_WithNullCheckpointInfo_ThrowsArgumentNullExceptionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + using var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var runId = Guid.NewGuid().ToString(); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveCheckpointAsync(runId, null!).AsTask()); + } + + #endregion + + #region Disposal Tests + + [SkippableFact] + public async Task Dispose_AfterDisposal_ThrowsObjectDisposedExceptionAsync() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + store.Dispose(); + + // Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync("test-run", checkpointValue).AsTask()); + } + + [SkippableFact] + public void Dispose_MultipleCalls_DoesNotThrow() + { + this.SkipIfEmulatorNotAvailable(); + + // Arrange + var store = new CosmosCheckpointStore(this._cosmosClient!, s_testDatabaseId, TestContainerId); + + // Act & Assert (should not throw) + store.Dispose(); + store.Dispose(); + store.Dispose(); + } + + #endregion + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + this._cosmosClient?.Dispose(); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs new file mode 100644 index 0000000000..195c433de5 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Xunit; + +namespace Microsoft.Agents.AI.CosmosNoSql.UnitTests; + +/// +/// Defines a collection fixture for Cosmos DB tests to ensure they run sequentially. +/// This prevents race conditions and resource conflicts when tests create and delete +/// databases in the Cosmos DB Emulator. +/// +[CollectionDefinition("CosmosDB", DisableParallelization = true)] +public sealed class CosmosDBCollectionFixture +{ + // This class has no code, and is never created. Its purpose is simply + // to be the place to apply [CollectionDefinition] and all the + // ICollectionFixture<> interfaces. +} diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/Microsoft.Agents.AI.CosmosNoSql.UnitTests.csproj b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/Microsoft.Agents.AI.CosmosNoSql.UnitTests.csproj new file mode 100644 index 0000000000..d60418ee2c --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/Microsoft.Agents.AI.CosmosNoSql.UnitTests.csproj @@ -0,0 +1,24 @@ + + + + net10.0;net9.0 + $(NoWarn);MEAI001 + + + + false + + + + + + + + + + + + + + +