Skip to content

Commit e3c8a20

Browse files
authored
Allow Upsert to configure value length based on Input, in Tsavorite. (#714)
* Allow Upsert to configure value length based on Input, in Tsavorite. * fix to actually call api during creation of new record in upsert * fix test * fix ycsb value length
1 parent ace7cb6 commit e3c8a20

20 files changed

+79
-2
lines changed

libs/server/Storage/Functions/MainStore/VarLenInputMethods.cs

+3
Original file line numberDiff line numberDiff line change
@@ -255,5 +255,8 @@ public int GetRMWModifiedValueLength(ref SpanByte t, ref SpanByte input)
255255

256256
return sizeof(int) + input.Length - RespInputHeader.Size;
257257
}
258+
259+
public int GetUpsertValueLength(ref SpanByte t, ref SpanByte input)
260+
=> t.TotalSize;
258261
}
259262
}

libs/server/Storage/Functions/ObjectStore/VarLenInputMethods.cs

+5
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,10 @@ public int GetRMWInitialValueLength(ref ObjectInput input)
2222
{
2323
throw new GarnetException("GetRMWInitialValueLength is not available on the object store");
2424
}
25+
26+
public int GetUpsertValueLength(ref IGarnetObject value, ref ObjectInput input)
27+
{
28+
throw new GarnetException("GetUpsertInitialValueLength is not available on the object store");
29+
}
2530
}
2631
}

libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/SessionFunctions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, re
8282

8383
public int GetRMWModifiedValueLength(ref Value value, ref Input input) => 0;
8484
public int GetRMWInitialValueLength(ref Input input) => 0;
85+
public int GetUpsertValueLength(ref Value value, ref Input input) => Value.Size;
8586

8687
public void PostSingleDeleter(ref Key key, ref DeleteInfo deleteInfo) { }
8788

libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/Value.cs

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ namespace Tsavorite.benchmark
1212
[StructLayout(LayoutKind.Explicit, Size = 8)]
1313
public struct Value
1414
{
15+
public const int Size = 8;
16+
1517
[FieldOffset(0)]
1618
public long value;
1719
}

libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs

+1
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ public void RMWCompletionCallback(ref TKey key, ref TInput input, ref TOutput ou
336336

337337
public int GetRMWModifiedValueLength(ref TValue value, ref TInput input) => 0;
338338
public int GetRMWInitialValueLength(ref TInput input) => 0;
339+
public int GetUpsertValueLength(ref TValue value, ref TInput input) => 0;
339340

340341
public void ConvertOutputToHeap(ref TInput input, ref TOutput output) { }
341342
}

libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocator.cs

+6
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ public readonly (int actualSize, int allocatedSize, int keySize) GetRMWInitialRe
104104
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>
105105
=> BlittableAllocatorImpl<TKey, TValue, TStoreFunctions>.GetRMWInitialRecordSize(ref key, ref input, sessionFunctions);
106106

107+
/// <inheritdoc/>
108+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
109+
public readonly (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TValue value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
110+
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>
111+
=> BlittableAllocatorImpl<TKey, TValue, TStoreFunctions>.GetUpsertRecordSize(ref key, ref value, ref input, sessionFunctions);
112+
107113
/// <inheritdoc/>
108114
[MethodImpl(MethodImplOptions.AggressiveInlining)]
109115
public readonly (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref TKey key, ref TValue value)

libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocatorImpl.cs

+4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public static (int actualSize, int allocatedSize, int keySize) GetRMWInitialReco
102102
[MethodImpl(MethodImplOptions.AggressiveInlining)]
103103
public static (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref TKey key, ref TValue value) => (RecordSize, RecordSize, KeySize);
104104

105+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
106+
public static (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TValue value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
107+
=> (RecordSize, RecordSize, KeySize);
108+
105109
[MethodImpl(MethodImplOptions.AggressiveInlining)]
106110
public static int GetValueLength(ref TValue value) => ValueSize;
107111

libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs

+6
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ public readonly (int actualSize, int allocatedSize, int keySize) GetRMWInitialRe
9797
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>
9898
=> _this.GetRMWInitialRecordSize(ref key, ref input, sessionFunctions);
9999

100+
/// <inheritdoc/>
101+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
102+
public readonly (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TValue value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
103+
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>
104+
=> _this.GetUpsertRecordSize(ref key, ref value, ref input, sessionFunctions);
105+
100106
/// <inheritdoc/>
101107
[MethodImpl(MethodImplOptions.AggressiveInlining)]
102108
public readonly (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref TKey key, ref TValue value) => _this.GetRecordSize(ref key, ref value);

libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocatorImpl.cs

+3
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ internal ref TValue GetValue(long physicalAddress)
151151

152152
internal (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref TKey key, ref TValue value) => (RecordSize, RecordSize, KeySize);
153153

154+
internal (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TValue value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
155+
=> (RecordSize, RecordSize, KeySize);
156+
154157
internal override bool TryComplete()
155158
{
156159
var b1 = objectLogDevice.TryComplete();

libs/storage/Tsavorite/cs/src/core/Allocator/IAllocator.cs

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ AllocatorBase<TKey, TValue, TStoreFunctions, TAllocator> GetBase<TAllocator>()
3131
(int actualSize, int allocatedSize, int keySize) GetRMWInitialRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
3232
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>;
3333

34+
/// <summary>Get record size required for the given <paramref name="key"/>, <paramref name="value"/>, and <paramref name="input"/></summary>
35+
(int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref TKey key, ref TValue value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
36+
where TSessionFunctionsWrapper : IVariableLengthInput<TValue, TInput>;
37+
3438
/// <summary>Get record size required for the given <paramref name="key"/> and <paramref name="value"/></summary>
3539
(int actualSize, int allocatedSize, int keySize) GetRecordSize(ref TKey key, ref TValue value);
3640

libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocator.cs

+6
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ public readonly (int actualSize, int allocatedSize, int keySize) GetRMWInitialRe
9898
where TSessionFunctionsWrapper : IVariableLengthInput<SpanByte, TInput>
9999
=> _this.GetRMWInitialRecordSize(ref key, ref input, sessionFunctions);
100100

101+
/// <inheritdoc/>
102+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
103+
public readonly (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref SpanByte key, ref SpanByte value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
104+
where TSessionFunctionsWrapper : IVariableLengthInput<SpanByte, TInput>
105+
=> _this.GetUpsertRecordSize(ref key, ref value, ref input, sessionFunctions);
106+
101107
/// <inheritdoc/>
102108
[MethodImpl(MethodImplOptions.AggressiveInlining)]
103109
public readonly (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref SpanByte key, ref SpanByte value) => _this.GetRecordSize(ref key, ref value);

libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs

+8
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,14 @@ public int GetRequiredRecordSize(long physicalAddress, int availableBytes)
176176
return (actualSize, RoundUp(actualSize, Constants.kRecordAlignment), keySize);
177177
}
178178

179+
public (int actualSize, int allocatedSize, int keySize) GetUpsertRecordSize<TInput, TSessionFunctionsWrapper>(ref SpanByte key, ref SpanByte value, ref TInput input, TSessionFunctionsWrapper sessionFunctions)
180+
where TSessionFunctionsWrapper : IVariableLengthInput<SpanByte, TInput>
181+
{
182+
int keySize = key.TotalSize;
183+
var actualSize = RecordInfo.GetLength() + RoundUp(keySize, Constants.kRecordAlignment) + sessionFunctions.GetUpsertValueLength(ref value, ref input);
184+
return (actualSize, RoundUp(actualSize, Constants.kRecordAlignment), keySize);
185+
}
186+
179187
public (int actualSize, int allocatedSize, int keySize) GetRecordSize(ref SpanByte key, ref SpanByte value)
180188
{
181189
int keySize = key.TotalSize;

libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs

+3
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ public void UnlockTransientShared(ref TKey key, ref OperationStackContext<TKey,
197197
[MethodImpl(MethodImplOptions.AggressiveInlining)]
198198
public int GetRMWModifiedValueLength(ref TValue t, ref TInput input) => _clientSession.functions.GetRMWModifiedValueLength(ref t, ref input);
199199

200+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
201+
public int GetUpsertValueLength(ref TValue t, ref TInput input) => _clientSession.functions.GetUpsertValueLength(ref t, ref input);
202+
200203
[MethodImpl(MethodImplOptions.AggressiveInlining)]
201204
public IHeapContainer<TInput> GetHeapContainer(ref TInput input)
202205
{

libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void RMWCompletionCallback(ref TKey key, ref TInput input, ref TOutput ou
5252

5353
public int GetRMWModifiedValueLength(ref TValue value, ref TInput input) => 0;
5454
public int GetRMWInitialValueLength(ref TInput input) => 0;
55+
public int GetUpsertValueLength(ref TValue value, ref TInput input) => _functions.GetUpsertValueLength(ref value, ref input);
5556

5657
/// <summary>
5758
/// No reads during compaction

libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs

+5
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ public interface ISessionFunctions<TKey, TValue, TInput, TOutput, TContext>
189189
/// Initial expected length of value object when populated by RMW using given input
190190
/// </summary>
191191
int GetRMWInitialValueLength(ref TInput input);
192+
193+
/// <summary>
194+
/// Length of resulting value object when performing Upsert of value using given input
195+
/// </summary>
196+
int GetUpsertValueLength(ref TValue value, ref TInput input);
192197
#endregion Variable-length value size
193198

194199
/// <summary>

libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public virtual void RMWCompletionCallback(ref TKey key, ref TInput input, ref TO
5757
public virtual int GetRMWModifiedValueLength(ref TValue value, ref TInput input) => throw new TsavoriteException("GetRMWModifiedValueLength is only available for SpanByte Functions");
5858
/// <inheritdoc/>
5959
public virtual int GetRMWInitialValueLength(ref TInput input) => throw new TsavoriteException("GetRMWInitialValueLength is only available for SpanByte Functions");
60+
/// <inheritdoc/>
61+
public virtual int GetUpsertValueLength(ref TValue value, ref TInput input) => throw new TsavoriteException("GetUpsertValueLength is only available for SpanByte Functions");
6062

6163
/// <inheritdoc/>
6264
public virtual void ConvertOutputToHeap(ref TInput input, ref TOutput output) { }

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalUpsert.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private OperationStatus CreateNewRecordUpsert<TInput, TOutput, TContext, TSessio
306306
ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, ref RecordInfo srcRecordInfo)
307307
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
308308
{
309-
var (actualSize, allocatedSize, keySize) = hlog.GetRecordSize(ref key, ref value); // Input is not included in record-length calculations for Upsert
309+
var (actualSize, allocatedSize, keySize) = hlog.GetUpsertRecordSize(ref key, ref value, ref input, sessionFunctions);
310310
AllocateOptions allocOptions = new()
311311
{
312312
Recycle = true,

libs/storage/Tsavorite/cs/src/core/VarLen/IVariableLengthInput.cs

+8
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,13 @@ public interface IVariableLengthInput<TValue, TInput>
1919
/// <param name="input"></param>
2020
/// <returns></returns>
2121
int GetRMWInitialValueLength(ref TInput input);
22+
23+
/// <summary>
24+
/// Length of value object, when populated by Upsert using given value and input
25+
/// </summary>
26+
/// <param name="value"></param>
27+
/// <param name="input"></param>
28+
/// <returns></returns>
29+
int GetUpsertValueLength(ref TValue value, ref TInput input);
2230
}
2331
}

libs/storage/Tsavorite/cs/src/core/VarLen/SpanByteFunctions.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System.Buffers;
55
using System.Runtime.CompilerServices;
6-
using static Tsavorite.core.Utility;
76

87
namespace Tsavorite.core
98
{
@@ -125,5 +124,12 @@ public override int GetRMWModifiedValueLength(ref SpanByte t, ref SpanByte input
125124

126125
/// <inheritdoc/>
127126
public override int GetRMWInitialValueLength(ref SpanByte input) => input.TotalSize;
127+
128+
/// <summary>
129+
/// Length of resulting object when doing Upsert with given value and input. Here we set the length to the
130+
/// length of the provided value, ignoring input. You can provide a custom implementation for other cases.
131+
/// </summary>
132+
public override int GetUpsertValueLength(ref SpanByte t, ref SpanByte input)
133+
=> t.TotalSize;
128134
}
129135
}

libs/storage/Tsavorite/cs/test/ExpirationTests.cs

+3
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,9 @@ public override void ReadCompletionCallback(ref SpanByte key, ref ExpirationInpu
475475
/// <inheritdoc/>
476476
public override int GetRMWInitialValueLength(ref ExpirationInput input) => MinValueLen;
477477

478+
/// <inheritdoc/>
479+
public override int GetUpsertValueLength(ref SpanByte value, ref ExpirationInput input) => value.TotalSize;
480+
478481
// Read functions
479482
public override unsafe bool SingleReader(ref SpanByte key, ref ExpirationInput input, ref SpanByte value, ref ExpirationOutput output, ref ReadInfo readInfo)
480483
{

0 commit comments

Comments
 (0)