Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IRead
ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(result.Exception, context);
await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException);
}

// Prevent the change feed lease from being checkpointed if cancellation was requested
cancellationToken.ThrowIfCancellationRequested();
Comment thread
ealsur marked this conversation as resolved.
}

public IScaleMonitor GetMonitor()
Expand Down
72 changes: 72 additions & 0 deletions test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Extensions.Tests;
Expand Down Expand Up @@ -75,6 +76,35 @@ await TestHelpers.Await(() =>
}
}

[Fact]
public async Task CosmosDBEndToEndCancellation()
{
using (var host = await StartHostAsync(typeof(EndToEndCancellationTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

// Insert an item to ensure the function is triggered
var response = await client.GetContainer(DatabaseName, CollectionName).UpsertItemAsync<Item>(new Item() { Id = Guid.NewGuid().ToString() });

// Trigger cancellation by stopping the host after the function has been triggered
await TestHelpers.Await(() => _loggerProvider.GetAllLogMessages().Any(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")));
await host.StopAsync();
}

// Start the host again and wait for the logs to show the cancelled item was reprocessed
using (var host = await StartHostAsync(typeof(EndToEndCancellationTestClass)))
{
await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) > 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger canceled!")) == 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});
}
}

public static async Task<CosmosClient> InitializeDocumentClientAsync(IConfiguration configuration, string databaseName, string collectionName)
{
var client = new CosmosClient(configuration.GetConnectionStringOrSetting(Constants.DefaultConnectionStringName).Value);
Expand Down Expand Up @@ -194,5 +224,47 @@ public static void TriggerWithRetry(
}
}
}

private static class EndToEndCancellationTestClass
{
private static string firstDocumentId = null;

public static async Task Trigger(
[CosmosDBTrigger(
DatabaseName,
CollectionName,
CreateLeaseContainerIfNotExists = true,
LeaseContainerPrefix = "cancellation",
LeaseExpirationInterval = 20 * 1000,
LeaseRenewInterval = 5 * 1000,
FeedPollDelay = 500,
StartFromBeginning = true)]IReadOnlyList<Item> documents,
ILogger log,
CancellationToken cancellationToken)
{
log.LogInformation("Trigger called!");

if (firstDocumentId == null)
{
// The first time around, make a note of the first item's ID
firstDocumentId = documents[0].Id;
try
{
// Use a delay to simulate processing
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
}
catch (TaskCanceledException)
{
log.LogWarning("Trigger canceled!");
throw;
}
}
else if (documents.Any(document => document.Id == firstDocumentId))
{
// Log a message if we see the first item from the earlier delay again
log.LogInformation("Saw the first document again!");
}
}
}
}
}