diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index db0ecacb32..60d2dd2c9f 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -59,6 +59,15 @@ public override async Task StartAsync() { if (!this.initialized) { + if (!this.changeFeedProcessorOptions.StartFromBeginning + && this.changeFeedProcessorOptions.StartTime == null + && string.IsNullOrEmpty(this.changeFeedProcessorOptions.StartContinuation)) + { + // StartTime is serialized as RFC1123 (seconds precision) and interpreted as exclusive. + // Back off by one second so writes occurring immediately after StartAsync are not missed. + this.changeFeedProcessorOptions.StartTime = DateTime.UtcNow.AddSeconds(-1); + } + await this.InitializeAsync().ConfigureAwait(false); } @@ -150,4 +159,4 @@ private PartitionManager BuildPartitionManager( return new PartitionManagerCore(bootstrapper, partitionController, partitionLoadBalancer); } } -} \ No newline at end of file +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicTests.cs index 02d2faac54..1ce5ddd56e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicTests.cs @@ -78,6 +78,44 @@ public async Task TestWithRunningProcessor() Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); } + [TestMethod] + public async Task TestWithRunningProcessor_ImmediateWriteAfterStart() + { + int partitionKey = 0; + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken token) => + { + await this.ValidateContextAsync(context); + processedDocCount += docs.Count(); + foreach (dynamic doc in docs) + { + accumulator += doc.id.ToString() + "."; + } + + if (processedDocCount >= 10) + { + allDocsProcessed.Set(); + } + }) + .WithInstanceName("random") + .WithLeaseContainer(this.LeaseContainer).Build(); + + await processor.StartAsync(); + foreach (int id in Enumerable.Range(0, 10)) + { + await this.Container.CreateItemAsync(new { id = id.ToString(), pk = partitionKey }); + } + + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); + } + [TestMethod] public async Task TestWithRunningProcessor_WithManualCheckpoint() { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs index d7b088484a..c6cab56274 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs @@ -770,4 +770,4 @@ public async Task HedgeTriggered_HedgeContextContainsMultipleRegions() "Multiple regions in HedgeContext confirms hedging was triggered."); } } -} \ No newline at end of file +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs index de6d7ddc7e..c439950182 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs @@ -131,6 +131,142 @@ public async Task StartAsync() } } + [TestMethod] + public async Task StartAsync_SetsStartTime_WhenNoStartOptionsProvided() + { + Mock leaseStore = new Mock(); + leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true); + + Mock leaseContainer = new Mock(); + leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); + + Mock leaseStoreManager = new Mock(); + leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); + leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); + leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); + + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); + ChangeFeedProcessorCore processor = null; + try + { + processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + processor.ApplyBuildConfiguration( + leaseStoreManager.Object, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + options, + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + await processor.StartAsync(); + + Assert.IsTrue(options.StartTime.HasValue); + Assert.AreEqual(DateTimeKind.Utc, options.StartTime.Value.Kind); + } + finally + { + if (processor != null) + { + await processor.StopAsync(); + } + } + } + + [TestMethod] + public async Task StartAsync_DoesNotOverrideExplicitStartTime() + { + Mock leaseStore = new Mock(); + leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true); + + Mock leaseContainer = new Mock(); + leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); + + Mock leaseStoreManager = new Mock(); + leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); + leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); + leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); + + DateTime explicitStartTime = DateTime.UtcNow.AddMinutes(-5); + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions + { + StartTime = explicitStartTime, + }; + + ChangeFeedProcessorCore processor = null; + try + { + processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + processor.ApplyBuildConfiguration( + leaseStoreManager.Object, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + options, + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + await processor.StartAsync(); + + Assert.AreEqual(explicitStartTime, options.StartTime); + } + finally + { + if (processor != null) + { + await processor.StopAsync(); + } + } + } + + [TestMethod] + public async Task StartAsync_DoesNotSetStartTime_WhenStartFromBeginning() + { + Mock leaseStore = new Mock(); + leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true); + + Mock leaseContainer = new Mock(); + leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); + + Mock leaseStoreManager = new Mock(); + leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); + leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); + leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); + + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions + { + StartFromBeginning = true, + }; + + ChangeFeedProcessorCore processor = null; + try + { + processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + processor.ApplyBuildConfiguration( + leaseStoreManager.Object, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + options, + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + await processor.StartAsync(); + + Assert.IsNull(options.StartTime); + } + finally + { + if (processor != null) + { + await processor.StopAsync(); + } + } + } + [TestMethod] public async Task ObserverIsCreated() {