Skip to content

Commit 401c185

Browse files
TedHartMSbadrishcvazois
authored
Add a ScanCursor wrapper for store iteration (#669)
* Add a ScanCursor wrapper for IterateMainStore() and IterateObjectStore() to avoid tempKV Remove a couple unused Iterate functions Fix reversed Assert.AreEqual params in a test * formatting fix * Convert another ScanFunctions implementation to class --------- Co-authored-by: Badrish Chandramouli <[email protected]> Co-authored-by: Vasileios Zois <[email protected]>
1 parent de53280 commit 401c185

File tree

9 files changed

+52
-48
lines changed

9 files changed

+52
-48
lines changed

libs/cluster/Server/Migration/MigrationKeyIterationFunctions.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ internal sealed unsafe partial class MigrateSession : IDisposable
1414
{
1515
internal sealed class MigrationKeyIterationFunctions
1616
{
17-
internal unsafe struct MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
17+
internal sealed unsafe class MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
1818
{
1919
MigrationScanIterator iterator;
2020

@@ -90,7 +90,7 @@ public void OnStop(bool completed, long numberOfRecords) { }
9090
public void OnException(Exception exception, long numberOfRecords) { }
9191
}
9292

93-
internal struct MigrationScanIterator
93+
internal sealed class MigrationScanIterator
9494
{
9595
readonly MigrateSession session;
9696
readonly HashSet<int> slots;

libs/cluster/Session/ClusterKeyIterationFunctions.cs

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
1313
{
1414
internal static class ClusterKeyIterationFunctions
1515
{
16-
internal struct MainStoreCountKeys : IScanIteratorFunctions<SpanByte, SpanByte>
16+
internal sealed class MainStoreCountKeys : IScanIteratorFunctions<SpanByte, SpanByte>
1717
{
18+
// This must be a class as it is passed through pending IO operations
1819
internal int keyCount;
1920
readonly int slot;
2021

@@ -34,8 +35,9 @@ public void OnStop(bool completed, long numberOfRecords) { }
3435
public void OnException(Exception exception, long numberOfRecords) { }
3536
}
3637

37-
internal struct ObjectStoreCountKeys : IScanIteratorFunctions<byte[], IGarnetObject>
38+
internal sealed class ObjectStoreCountKeys : IScanIteratorFunctions<byte[], IGarnetObject>
3839
{
40+
// This must be a class as it is passed through pending IO operations
3941
internal int keyCount;
4042
readonly int slot;
4143

libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,15 @@ internal unsafe bool DbScan(ArgSlice patternB, bool allKeys, long cursor, out lo
111111
}
112112

113113
/// <summary>
114-
/// Iterate the contents of the main store
114+
/// Iterate the contents of the main store (push-based)
115115
/// </summary>
116116
/// <typeparam name="TScanFunctions"></typeparam>
117117
/// <param name="scanFunctions"></param>
118118
/// <param name="untilAddress"></param>
119119
/// <returns></returns>
120120
internal bool IterateMainStore<TScanFunctions>(ref TScanFunctions scanFunctions, long untilAddress = -1)
121121
where TScanFunctions : IScanIteratorFunctions<SpanByte, SpanByte>
122-
=> basicContext.Session.Iterate(ref scanFunctions, untilAddress);
122+
=> basicContext.Session.IterateLookup(ref scanFunctions, untilAddress);
123123

124124
/// <summary>
125125
/// Iterate the contents of the main store (pull based)
@@ -128,15 +128,15 @@ internal ITsavoriteScanIterator<SpanByte, SpanByte> IterateMainStore()
128128
=> basicContext.Session.Iterate();
129129

130130
/// <summary>
131-
/// Iterate the contents of the object store
131+
/// Iterate the contents of the object store (push-based)
132132
/// </summary>
133133
/// <typeparam name="TScanFunctions"></typeparam>
134134
/// <param name="scanFunctions"></param>
135135
/// <param name="untilAddress"></param>
136136
/// <returns></returns>
137137
internal bool IterateObjectStore<TScanFunctions>(ref TScanFunctions scanFunctions, long untilAddress = -1)
138138
where TScanFunctions : IScanIteratorFunctions<byte[], IGarnetObject>
139-
=> objectStoreBasicContext.Session.Iterate(ref scanFunctions, untilAddress);
139+
=> objectStoreBasicContext.Session.IterateLookup(ref scanFunctions, untilAddress);
140140

141141
/// <summary>
142142
/// Iterate the contents of the main store (pull based)

libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs

+2-17
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,6 @@
88

99
namespace Tsavorite.core
1010
{
11-
internal sealed class ScanCursorState<TKey, TValue>
12-
{
13-
internal IScanIteratorFunctions<TKey, TValue> functions;
14-
internal long acceptedCount; // Number of records pushed to and accepted by the caller
15-
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as of "count" records had been returned)
16-
internal bool stop; // Stop the operation (as if all records in the db had been returned)
17-
18-
internal void Initialize(IScanIteratorFunctions<TKey, TValue> scanIteratorFunctions)
19-
{
20-
functions = scanIteratorFunctions;
21-
acceptedCount = 0;
22-
endBatch = false;
23-
stop = false;
24-
}
25-
}
26-
2711
public abstract partial class AllocatorBase<TKey, TValue, TStoreFunctions, TAllocator> : IDisposable
2812
where TStoreFunctions : IStoreFunctions<TKey, TValue>
2913
where TAllocator : IAllocator<TKey, TValue, TStoreFunctions>
@@ -205,7 +189,8 @@ internal unsafe bool GetFromDiskAndPushToReader<TScanFunctions>(ref TKey key, re
205189
internal abstract bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
206190
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>;
207191

208-
private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor)
192+
private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store,
193+
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor)
209194
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>
210195
where TScanIterator : ITsavoriteScanIterator<TKey, TValue>, IPushScanIterator<TKey>
211196
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
namespace Tsavorite.core
5+
{
6+
internal sealed class ScanCursorState<TKey, TValue>
7+
{
8+
internal IScanIteratorFunctions<TKey, TValue> functions;
9+
internal long acceptedCount; // Number of records pushed to and accepted by the caller
10+
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as of "count" records had been returned)
11+
internal bool stop; // Stop the operation (as if all records in the db had been returned)
12+
13+
internal void Initialize(IScanIteratorFunctions<TKey, TValue> scanIteratorFunctions)
14+
{
15+
functions = scanIteratorFunctions;
16+
acceptedCount = 0;
17+
endBatch = false;
18+
stop = false;
19+
}
20+
}
21+
}

libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs

+16-1
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ public ITsavoriteScanIterator<TKey, TValue> Iterate(long untilAddress = -1)
467467
=> store.Iterate<TInput, TOutput, TContext, TFunctions>(functions, untilAddress);
468468

469469
/// <summary>
470-
/// Push iteration of all (distinct) live key-values stored in Tsavorite
470+
/// Push iteration of all (distinct) live key-values stored in Tsavorite, using a temporary TsavoriteKV to ensure uniqueness
471471
/// </summary>
472472
/// <param name="scanFunctions">Functions receiving pushed records</param>
473473
/// <param name="untilAddress">Report records until this address (tail by default)</param>
@@ -476,6 +476,21 @@ public bool Iterate<TScanFunctions>(ref TScanFunctions scanFunctions, long until
476476
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>
477477
=> store.Iterate<TInput, TOutput, TContext, TFunctions, TScanFunctions>(functions, ref scanFunctions, untilAddress);
478478

479+
/// <summary>
480+
/// Push iteration of all (distinct) live key-values stored in Tsavorite, using a lookup strategy to ensure uniqueness
481+
/// </summary>
482+
/// <param name="scanFunctions">Functions receiving pushed records</param>
483+
/// <param name="untilAddress">Report records until this address (tail by default)</param>
484+
/// <returns>True if Iteration completed; false if Iteration ended early due to one of the TScanIterator reader functions returning false</returns>
485+
public bool IterateLookup<TScanFunctions>(ref TScanFunctions scanFunctions, long untilAddress = -1)
486+
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>
487+
{
488+
if (untilAddress == -1)
489+
untilAddress = store.Log.TailAddress;
490+
var cursor = 0L;
491+
return ScanCursor(ref cursor, count: long.MaxValue, scanFunctions, endAddress: untilAddress);
492+
}
493+
479494
/// <summary>
480495
/// Push-scan the log from <paramref name="cursor"/> (which should be a valid address) and push up to <paramref name="count"/> records
481496
/// to the caller via <paramref name="scanFunctions"/> for each Key that is not found at a higher address.

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/FindRecord.cs

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ internal bool TryFindRecordInMainLog(ref TKey key, ref OperationStackContext<TKe
4949
}
5050

5151
[MethodImpl(MethodImplOptions.AggressiveInlining)]
52+
// Return true if the record is found in the log, else false and an indication of whether we need to do IO to continue the search
5253
internal bool TryFindRecordInMainLogForConditionalOperation<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions,
5354
ref TKey key, ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, long minAddress, out OperationStatus internalStatus, out bool needIO)
5455
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteIterator.cs

-20
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,6 @@ public bool Iterate<TInput, TOutput, TContext, TFunctions, TScanFunctions>(TFunc
5151
scanFunctions.OnStop(!stop, numRecords);
5252
return !stop;
5353
}
54-
55-
/// <summary>
56-
/// Iterator for all (distinct) live key-values stored in Tsavorite
57-
/// </summary>
58-
/// <param name="untilAddress">Report records until this address (tail by default)</param>
59-
/// <returns>Tsavorite iterator</returns>
60-
[Obsolete("Invoke Iterate() on a client session (ClientSession), or use store.Iterate overload with Functions provided as parameter")]
61-
public ITsavoriteScanIterator<TKey, TValue> Iterate(long untilAddress = -1)
62-
=> throw new TsavoriteException("Invoke Iterate() on a client session (ClientSession), or use store.Iterate overload with Functions provided as parameter");
63-
64-
/// <summary>
65-
/// Iterator for all (distinct) live key-values stored in Tsavorite
66-
/// </summary>
67-
/// <param name="compactionFunctions">User provided compaction functions (see <see cref="ICompactionFunctions{Key, Value}"/>).</param>
68-
/// <param name="untilAddress">Report records until this address (tail by default)</param>
69-
/// <returns>Tsavorite iterator</returns>
70-
[Obsolete("Invoke Iterate() on a client session (ClientSession), or use store.Iterate overload with Functions provided as parameter")]
71-
public ITsavoriteScanIterator<TKey, TValue> Iterate<CompactionFunctions>(CompactionFunctions compactionFunctions, long untilAddress = -1)
72-
where CompactionFunctions : ICompactionFunctions<TKey, TValue>
73-
=> throw new TsavoriteException("Invoke Iterate() on a client session (ClientSession), or use store.Iterate overload with Functions provided as parameter");
7454
}
7555

7656
internal sealed class TsavoriteKVIterator<TKey, TValue, TInput, TOutput, TContext, TFunctions, TStoreFunctions, TAllocator> : ITsavoriteScanIterator<TKey, TValue>

test/Garnet.test.cluster/ClusterMigrateTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,12 @@ public void ClusterSimpleSlotInfo()
266266

267267
var sourceIndex = context.clusterTestUtils.GetSourceNodeIndexFromSlot((ushort)slot, context.logger);
268268
var expectedKeyCount = context.clusterTestUtils.CountKeysInSlot(slot, context.logger);
269-
ClassicAssert.AreEqual(expectedKeyCount, keyCount);
269+
ClassicAssert.AreEqual(keyCount, expectedKeyCount);
270270
_ = context.clusterTestUtils.CountKeysInSlot(-1, context.logger);
271271
_ = context.clusterTestUtils.CountKeysInSlot(ushort.MaxValue, context.logger);
272272

273273
var result = context.clusterTestUtils.GetKeysInSlot(sourceIndex, slot, expectedKeyCount, context.logger);
274-
ClassicAssert.AreEqual(result.Count, keyCount);
274+
ClassicAssert.AreEqual(keyCount, result.Count);
275275
_ = context.clusterTestUtils.GetKeysInSlot(-1, expectedKeyCount);
276276
_ = context.clusterTestUtils.GetKeysInSlot(ushort.MaxValue, expectedKeyCount);
277277

0 commit comments

Comments
 (0)