Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public override async Task StartAsync()
{
if (!this.initialized)
{
if (!this.changeFeedProcessorOptions.StartFromBeginning
Comment thread
NaluTripician marked this conversation as resolved.
&& this.changeFeedProcessorOptions.StartTime == null
Comment thread
NaluTripician marked this conversation as resolved.
&& string.IsNullOrEmpty(this.changeFeedProcessorOptions.StartContinuation))
{
// StartTime is serialized as RFC1123 (seconds precision) and interpreted as exclusive.
// Back off by one second so writes occurring immediately after StartAsync are not missed.
this.changeFeedProcessorOptions.StartTime = DateTime.UtcNow.AddSeconds(-1);
}

await this.InitializeAsync().ConfigureAwait(false);
}

Expand Down Expand Up @@ -150,4 +159,4 @@ private PartitionManager BuildPartitionManager(
return new PartitionManagerCore(bootstrapper, partitionController, partitionLoadBalancer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,44 @@ public async Task TestWithRunningProcessor()
Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator);
}

[TestMethod]
public async Task TestWithRunningProcessor_ImmediateWriteAfterStart()
{
int partitionKey = 0;
ManualResetEvent allDocsProcessed = new ManualResetEvent(false);

int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
await this.ValidateContextAsync(context);
processedDocCount += docs.Count();
foreach (dynamic doc in docs)
{
accumulator += doc.id.ToString() + ".";
}

if (processedDocCount >= 10)
{
allDocsProcessed.Set();
}
})
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
foreach (int id in Enumerable.Range(0, 10))
{
await this.Container.CreateItemAsync<dynamic>(new { id = id.ToString(), pk = partitionKey });
}

bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
await processor.StopAsync();
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator);
Comment thread
NaluTripician marked this conversation as resolved.
}

[TestMethod]
public async Task TestWithRunningProcessor_WithManualCheckpoint()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,4 @@ public async Task HedgeTriggered_HedgeContextContainsMultipleRegions()
"Multiple regions in HedgeContext confirms hedging was triggered.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,142 @@ public async Task StartAsync()
}
}

[TestMethod]
public async Task StartAsync_SetsStartTime_WhenNoStartOptionsProvided()
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();
ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

await processor.StartAsync();

Assert.IsTrue(options.StartTime.HasValue);
Assert.AreEqual(DateTimeKind.Utc, options.StartTime.Value.Kind);
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}

[TestMethod]
public async Task StartAsync_DoesNotOverrideExplicitStartTime()
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

DateTime explicitStartTime = DateTime.UtcNow.AddMinutes(-5);
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions
{
StartTime = explicitStartTime,
};

ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

await processor.StartAsync();

Assert.AreEqual(explicitStartTime, options.StartTime);
Comment thread
NaluTripician marked this conversation as resolved.
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}

[TestMethod]
public async Task StartAsync_DoesNotSetStartTime_WhenStartFromBeginning()
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions
{
StartFromBeginning = true,
};

ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

await processor.StartAsync();

Assert.IsNull(options.StartTime);
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}

[TestMethod]
public async Task ObserverIsCreated()
{
Expand Down
Loading