diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
index e96747d3..92e340e6 100644
--- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
@@ -206,6 +206,11 @@ public class NetheriteOrchestrationServiceSettings
///
public int PartitionStartupTimeoutMinutes { get; set; } = 15;
+ ///
+ /// If true, disables the prefetching during replay.
+ ///
+ public bool DisablePrefetchDuringReplay { get; set; } = false;
+
///
/// Allows attaching additional checkers and debuggers during testing.
///
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
index d2871f17..6af3503a 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
@@ -42,6 +42,8 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
BlobUtilsV12.BlockBlobClients eventLogCommitBlob;
BlobLeaseClient leaseClient;
+ BlobUtilsV12.BlockBlobClients checkpointCompletedBlob;
+
BlobUtilsV12.BlobDirectory pageBlobPartitionDirectory;
BlobUtilsV12.BlobDirectory blockBlobPartitionDirectory;
@@ -419,6 +421,7 @@ public async Task StartAsync()
this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(CommitBlobName);
this.leaseClient = this.eventLogCommitBlob.WithRetries.GetBlobLeaseClient();
+ this.checkpointCompletedBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());
AzureStorageDevice createDevice(string name) =>
new AzureStorageDevice(name, this.blockBlobPartitionDirectory.GetSubDirectory(name), this.pageBlobPartitionDirectory.GetSubDirectory(name), this, true);
@@ -1057,10 +1060,11 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens()
internal async Task FindCheckpointsAsync(bool logIsEmpty)
{
- BlobUtilsV12.BlockBlobClients checkpointCompletedBlob = default;
+ string jsonString = null;
+ DateTimeOffset lastModified = default;
+
try
{
- string jsonString = null;
if (this.UseLocalFiles)
{
@@ -1076,7 +1080,6 @@ internal async Task FindCheckpointsAsync(bool logIsEmpty)
else
{
var partDir = this.blockBlobPartitionDirectory;
- checkpointCompletedBlob = partDir.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());
await this.PerformWithRetriesAsync(
semaphore: null,
@@ -1084,7 +1087,7 @@ await this.PerformWithRetriesAsync(
"BlockBlobClient.DownloadContentAsync",
"FindCheckpointsAsync",
"",
- checkpointCompletedBlob.Name,
+ this.checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: false,
@@ -1092,8 +1095,9 @@ await this.PerformWithRetriesAsync(
{
try
{
- Azure.Response downloadResult = await checkpointCompletedBlob.WithRetries.DownloadContentAsync();
+ Azure.Response downloadResult = await this.checkpointCompletedBlob.WithRetries.DownloadContentAsync();
jsonString = downloadResult.Value.Content.ToString();
+ lastModified = downloadResult.Value.Details.LastModified;
this.CheckpointInfoETag = downloadResult.Value.Details.ETag;
return 1;
}
@@ -1105,22 +1109,65 @@ await this.PerformWithRetriesAsync(
});
}
- if (jsonString == null)
- {
- return false;
- }
- else
- {
- // read the fields from the json to update the checkpoint info
- JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
- return true;
- }
}
catch (Exception e)
{
- this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
+ this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", this.checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
+ throw;
+ }
+
+ if (jsonString == null)
+ {
+ return false;
+ }
+
+ try
+ {
+ // read the fields from the json to update the checkpoint info
+ JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
+ }
+ catch (JsonException e)
+ {
+ this.HandleStorageError(nameof(FindCheckpointsAsync), "could not parse json file describing last checkpoint", this.checkpointCompletedBlob.Name, e, true, false);
throw;
}
+
+ if (this.CheckpointInfo.RecoveryAttempts > 0 || DateTimeOffset.UtcNow - lastModified > TimeSpan.FromMinutes(5))
+ {
+ this.CheckpointInfo.RecoveryAttempts++;
+
+ this.TraceHelper.FasterProgress($"Incremented recovery attempt counter to {this.CheckpointInfo.RecoveryAttempts} in {this.checkpointCompletedBlob.Name}.");
+
+ await this.WriteCheckpointMetadataAsync();
+
+ // we start to boost the tracing after three failed attempts. This boosting applies to the recovery part only.
+ int StartBoostingAfter = 3;
+
+ // After some number of boosted attempts, we stop boosting since it seems unlikely that we will find new information.
+ int BoostFor = 10;
+
+ if (this.CheckpointInfo.RecoveryAttempts > StartBoostingAfter
+ && this.CheckpointInfo.RecoveryAttempts <= StartBoostingAfter + BoostFor)
+ {
+ this.TraceHelper.BoostTracing = true;
+ }
+ }
+
+ return true;
+ }
+
+ public async Task ClearRecoveryAttempts()
+ {
+ if (this.CheckpointInfo.RecoveryAttempts > 0)
+ {
+ this.CheckpointInfo.RecoveryAttempts = 0;
+
+ this.TraceHelper.BoostTracing = false;
+
+ await this.WriteCheckpointMetadataAsync();
+
+ this.TraceHelper.FasterProgress($"Cleared recovery attempt counter in {this.checkpointCompletedBlob.Name}.");
+ }
}
void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
@@ -1436,7 +1483,7 @@ await this.PerformWithRetriesAsync(
}
}
- internal async Task FinalizeCheckpointCompletedAsync()
+ internal async Task WriteCheckpointMetadataAsync()
{
var jsonText = JsonConvert.SerializeObject(this.CheckpointInfo, Formatting.Indented);
if (this.UseLocalFiles)
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs
index 40f8d5da..30048422 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs
@@ -10,25 +10,46 @@ namespace DurableTask.Netherite.Faster
[JsonObject]
class CheckpointInfo
{
+ ///
+ /// The FasterKV token for the last index checkpoint taken before this checkpoint.
+ ///
[JsonProperty]
public Guid IndexToken { get; set; }
+ ///
+ /// The FasterKV token for this checkpoint.
+ ///
[JsonProperty]
public Guid LogToken { get; set; }
+ ///
+ /// The FasterLog position for this checkpoint.
+ ///
[JsonProperty]
public long CommitLogPosition { get; set; }
+ ///
+ /// The input queue (event hubs) position for this checkpoint.
+ ///
[JsonProperty]
public long InputQueuePosition { get; set; }
+ ///
+ /// If the input queue position is a batch, the position within the batch.
+ ///
[JsonProperty]
public int InputQueueBatchPosition { get; set; }
+ ///
+ /// The input queue fingerprint for this checkpoint.
+ ///
[JsonProperty]
public string InputQueueFingerprint { get; set; }
- [JsonProperty]
- public long NumberInstances { get; set; }
+ ///
+ /// The number of recovery attempts that have been made for this checkpoint.
+ ///
+ //[JsonProperty]
+ public int RecoveryAttempts { get; set; }
}
}
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
index a7b323c0..3c152c85 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
@@ -380,7 +380,7 @@ void RunTask() {
public override async Task FinalizeCheckpointCompletedAsync(Guid guid)
{
- await this.blobManager.FinalizeCheckpointCompletedAsync();
+ await this.blobManager.WriteCheckpointMetadataAsync();
if (this.cacheDebugger == null)
{
@@ -739,7 +739,7 @@ public override async Task RunPrefetchSession(IAsyncEnumerable
long lastReport = 0;
void ReportProgress(int elapsedMillisecondsThreshold)
{
- if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold)
+ if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold || this.TraceHelper.BoostTracing)
{
this.blobManager.TraceHelper.FasterProgress(
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs
index e42650f9..382948ab 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs
@@ -25,7 +25,9 @@ public FasterTraceHelper(ILogger logger, LogLevel logLevelLimit, ILogger perform
this.partitionId = (int) partitionId;
}
- public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace;
+ public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace || this.BoostTracing;
+
+ public bool BoostTracing { get; set; }
// ----- faster storage layer events
@@ -139,7 +141,7 @@ public void FasterProgress(Func constructString)
public void FasterStorageProgress(string details)
{
- if (this.logLevelLimit <= LogLevel.Trace)
+ if (this.logLevelLimit <= LogLevel.Trace || this.BoostTracing)
{
this.logger.LogTrace("Part{partition:D2} {details}", this.partitionId, details);
EtwSource.Log.FasterStorageProgress(this.account, this.taskHub, this.partitionId, details, TraceUtils.AppName, TraceUtils.ExtensionVersion);
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
index 16101ace..601f9fcb 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
@@ -207,7 +207,7 @@ protected override async Task Process(IList batch)
}
}
- public async Task ReplayCommitLog(long from, StoreWorker worker)
+ public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePrefetch)
{
// this procedure is called by StoreWorker during recovery. It replays all the events
// that were committed to the log but are not reflected in the loaded store checkpoint.
@@ -215,12 +215,16 @@ public async Task ReplayCommitLog(long from, StoreWorker worker)
{
// we create a pipeline where the fetch task obtains a stream of events and then duplicates the
// stream, so it can get replayed and prefetched in parallel.
- var prefetchChannel = Channel.CreateBounded(1000);
+ var prefetchChannel = enablePrefetch ? Channel.CreateBounded(1000) : null;
var replayChannel = Channel.CreateBounded(1000);
- var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer);
+ var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel?.Writer);
var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker));
- var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));
+
+ if (enablePrefetch)
+ {
+ var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));
+ }
await fetchTask;
await replayTask;
@@ -241,23 +245,33 @@ async Task FetchEvents(long from, ChannelWriter replayChan
await replayChannelWriter.WriteAsync(partitionEvent);
- if (partitionEvent is IRequiresPrefetch evt)
+ if (prefetchChannelWriter != null && partitionEvent is IRequiresPrefetch evt)
{
foreach (var key in evt.KeysToPrefetch)
{
+ if (this.traceHelper.BoostTracing)
+ {
+ this.traceHelper.FasterProgress($"Replay Prefetches {key}");
+ }
+
await prefetchChannelWriter.WriteAsync(key);
}
}
}
replayChannelWriter.Complete();
- prefetchChannelWriter.Complete();
+ prefetchChannelWriter?.Complete();
}
async Task ReplayEvents(ChannelReader reader, StoreWorker worker)
{
await foreach (var partitionEvent in reader.ReadAllAsync(this.cancellationToken))
{
+ if (this.traceHelper.BoostTracing)
+ {
+ this.traceHelper.FasterProgress($"Replaying PartitionEvent {partitionEvent.NextCommitLogPosition}");
+ }
+
await worker.ReplayUpdate(partitionEvent);
}
}
@@ -285,7 +299,7 @@ async IAsyncEnumerable EventsToReplay(long from)
await iter.WaitAsync(this.cancellationToken);
}
- if (this.traceLogDetails)
+ if (this.traceHelper.IsTracingAtMostDetailedLevel)
{
this.TraceLogDetail("Read", iter.NextAddress, new ReadOnlySpan(result, 0, entryLength));
}
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
index f534db80..6434dcb8 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
@@ -20,9 +20,6 @@ class PartitionStorage : IPartitionState
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
- //readonly CloudStorageAccount storageAccount;
- //readonly string localFileDirectory;
- //readonly CloudStorageAccount pageBlobStorageAccount;
Partition partition;
BlobManager blobManager;
@@ -193,7 +190,14 @@ async Task TerminationWrapper(Task what)
if (this.log.TailAddress > (long)this.storeWorker.CommitLogPosition)
{
// replay log as the store checkpoint lags behind the log
- await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker));
+
+ // after six unsuccessful attempts, we start disabling prefetch on every other attempt, to see if this can remedy the problem
+ int startDisablingPrefetchAfter = 6;
+
+ bool disablePrefetch = this.settings.DisablePrefetchDuringReplay
+ || (this.blobManager.CheckpointInfo.RecoveryAttempts > startDisablingPrefetchAfter && (this.blobManager.CheckpointInfo.RecoveryAttempts - startDisablingPrefetchAfter) % 2 == 1);
+
+ await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch));
}
}
catch (OperationCanceledException) when (this.partition.ErrorHandler.IsTerminated)
@@ -215,6 +219,8 @@ async Task TerminationWrapper(Task what)
}
this.TraceHelper.FasterProgress("Recovery complete");
+
+ await this.blobManager.ClearRecoveryAttempts();
}
this.blobManager.FaultInjector?.Started(this.blobManager);
return this.storeWorker.InputQueuePosition;
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
index 4b0238b9..95ad1488 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
@@ -681,16 +681,16 @@ protected override async Task Process(IList batch)
return target;
}
- public async Task ReplayCommitLog(LogWorker logWorker)
+ public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch)
{
var startPosition = this.CommitLogPosition;
- this.traceHelper.FasterProgress($"Replaying log from {startPosition}");
+ this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch} boostTracing={this.traceHelper.BoostTracing}");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
this.effectTracker.IsReplaying = true;
- await logWorker.ReplayCommitLog(startPosition, this);
+ await logWorker.ReplayCommitLog(startPosition, this, prefetch);
stopwatch.Stop();
this.effectTracker.IsReplaying = false;