-
Notifications
You must be signed in to change notification settings - Fork 550
/
Copy pathGarnetServer.cs
453 lines (384 loc) · 19.9 KB
/
GarnetServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using Garnet.cluster;
using Garnet.common;
using Garnet.networking;
using Garnet.server;
using Garnet.server.Auth.Settings;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
namespace Garnet
{
using MainStoreAllocator = SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>;
using MainStoreFunctions = StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>;
using ObjectStoreAllocator = GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>;
using ObjectStoreFunctions = StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>;
/// <summary>
/// Implementation Garnet server
/// </summary>
public class GarnetServer : IDisposable
{
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8.
readonly string version = "1.0.37";
internal GarnetProvider Provider;
private readonly GarnetServerOptions opts;
private IGarnetServer server;
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> store;
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> objectStore;
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
private SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> subscribeBroker;
private KVSettings<SpanByte, SpanByte> kvSettings;
private KVSettings<byte[], IGarnetObject> objKvSettings;
private INamedDeviceFactory logFactory;
private MemoryLogger initLogger;
private ILogger logger;
private readonly ILoggerFactory loggerFactory;
private readonly bool cleanupDir;
private bool disposeLoggerFactory;
/// <summary>
/// Store and associated information used by this Garnet server
/// </summary>
protected StoreWrapper storeWrapper;
/// <summary>
/// Resp protocol version
/// </summary>
readonly string redisProtocolVersion = "7.2.5";
/// <summary>
/// Metrics API
/// </summary>
public MetricsApi Metrics;
/// <summary>
/// Command registration API
/// </summary>
public RegisterApi Register;
/// <summary>
/// Store API
/// </summary>
public StoreApi Store;
/// <summary>
/// Create Garnet Server instance using specified command line arguments; use Start to start the server.
/// </summary>
/// <param name="commandLineArgs">Command line arguments</param>
/// <param name="loggerFactory">Logger factory</param>
/// <param name="cleanupDir">Clean up directory.</param>
/// <param name="authenticationSettingsOverride">Override for custom authentication settings.</param>
public GarnetServer(string[] commandLineArgs, ILoggerFactory loggerFactory = null, bool cleanupDir = false, IAuthenticationSettings authenticationSettingsOverride = null)
{
Trace.Listeners.Add(new ConsoleTraceListener());
// Set up an initial memory logger to log messages from configuration parser into memory.
using (var memLogProvider = new MemoryLoggerProvider())
{
this.initLogger = (MemoryLogger)memLogProvider.CreateLogger("ArgParser");
}
if (!ServerSettingsManager.TryParseCommandLineArguments(commandLineArgs, out var serverSettings, out _, this.initLogger))
{
// Flush logs from memory logger
FlushMemoryLogger(this.initLogger, "ArgParser", loggerFactory);
throw new GarnetException("Encountered an error when initializing Garnet server. Please see log messages above for more details.");
}
if (loggerFactory == null)
{
// If the main logger factory is created by GarnetServer, it should be disposed when GarnetServer is disposed
disposeLoggerFactory = true;
}
else
{
this.initLogger.LogWarning(
$"Received an external ILoggerFactory object. The following configuration options are ignored: {nameof(serverSettings.FileLogger)}, {nameof(serverSettings.LogLevel)}, {nameof(serverSettings.DisableConsoleLogger)}.");
}
// If no logger factory is given, set up main logger factory based on parsed configuration values,
// otherwise use given logger factory.
this.loggerFactory = loggerFactory ?? LoggerFactory.Create(builder =>
{
if (!serverSettings.DisableConsoleLogger.GetValueOrDefault())
{
builder.AddSimpleConsole(options =>
{
options.SingleLine = true;
options.TimestampFormat = "hh::mm::ss ";
});
}
// Optional: Flush log output to file.
if (serverSettings.FileLogger != null)
builder.AddFile(serverSettings.FileLogger);
builder.SetMinimumLevel(serverSettings.LogLevel);
});
// Assign values to GarnetServerOptions
this.opts = serverSettings.GetServerOptions(this.loggerFactory.CreateLogger("Options"));
this.opts.AuthSettings = authenticationSettingsOverride ?? this.opts.AuthSettings;
this.cleanupDir = cleanupDir;
this.InitializeServer();
}
/// <summary>
/// Create Garnet Server instance using GarnetServerOptions instance; use Start to start the server.
/// </summary>
/// <param name="opts">Server options</param>
/// <param name="loggerFactory">Logger factory</param>
/// <param name="server">The IGarnetServer to use. If none is provided, will use a GarnetServerTcp.</param>
/// <param name="cleanupDir">Whether to clean up data folders on dispose</param>
public GarnetServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, IGarnetServer server = null, bool cleanupDir = false)
{
this.server = server;
this.opts = opts;
this.loggerFactory = loggerFactory;
this.cleanupDir = cleanupDir;
this.InitializeServer();
}
private void InitializeServer()
{
Debug.Assert(opts != null);
if (!opts.QuietMode)
{
var red = "\u001b[31m";
var magenta = "\u001b[35m";
var normal = "\u001b[0m";
Console.WriteLine($@"{red} _________
/_||___||_\ {normal}Garnet {version} {(IntPtr.Size == 8 ? "64" : "32")} bit; {(opts.EnableCluster ? "cluster" : "standalone")} mode{red}
'. \ / .' {normal}Port: {opts.Port}{red}
'.\ /.' {magenta}https://aka.ms/GetGarnet{red}
'.'
{normal}");
}
var clusterFactory = opts.EnableCluster ? new ClusterFactory() : null;
this.logger = this.loggerFactory?.CreateLogger("GarnetServer");
logger?.LogInformation("Garnet {version} {bits} bit; {clusterMode} mode; Port: {port}", version, IntPtr.Size == 8 ? "64" : "32", opts.EnableCluster ? "cluster" : "standalone", opts.Port);
// Flush initialization logs from memory logger
FlushMemoryLogger(this.initLogger, "ArgParser", this.loggerFactory);
var customCommandManager = new CustomCommandManager();
var setMax = opts.ThreadPoolMaxThreads <= 0 || ThreadPool.SetMaxThreads(opts.ThreadPoolMaxThreads, opts.ThreadPoolMaxThreads);
if (opts.ThreadPoolMinThreads > 0 && !ThreadPool.SetMinThreads(opts.ThreadPoolMinThreads, opts.ThreadPoolMinThreads))
throw new Exception($"Unable to call ThreadPool.SetMinThreads with {opts.ThreadPoolMinThreads}");
// Retry to set max threads if it wasn't set in the previous step
if (!setMax && !ThreadPool.SetMaxThreads(opts.ThreadPoolMaxThreads, opts.ThreadPoolMaxThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {opts.ThreadPoolMaxThreads}");
CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);
if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true);
CreateAOF();
logger?.LogTrace("TLS is {tlsEnabled}", opts.TlsOptions == null ? "disabled" : "enabled");
if (logger != null)
{
var configMemoryLimit = (store.IndexSize * 64) + store.Log.MaxMemorySizeBytes + (store.ReadCache?.MaxMemorySizeBytes ?? 0) + (appendOnlyFile?.MaxMemorySizeBytes ?? 0);
if (objectStore != null)
configMemoryLimit += objectStore.IndexSize * 64 + objectStore.Log.MaxMemorySizeBytes + (objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) + (objectStoreSizeTracker?.TargetSize ?? 0);
logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
}
// Create Garnet TCP server if none was provided.
this.server ??= new GarnetServerTcp(opts.Address, opts.Port, 0, opts.TlsOptions, opts.NetworkSendThrottleMax, logger);
storeWrapper = new StoreWrapper(version, redisProtocolVersion, server, store, objectStore, objectStoreSizeTracker,
customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory);
// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, subscribeBroker);
// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Register = new RegisterApi(Provider);
Store = new StoreApi(storeWrapper);
server.Register(WireFormat.ASCII, Provider);
LoadModules(customCommandManager);
}
private void LoadModules(CustomCommandManager customCommandManager)
{
if (opts.LoadModuleCS == null)
return;
foreach (var moduleCS in opts.LoadModuleCS)
{
var moduleCSData = moduleCS.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (moduleCSData.Length < 1)
continue;
var modulePath = moduleCSData[0];
var moduleArgs = moduleCSData.Length > 1 ? moduleCSData.Skip(1).ToArray() : [];
if (ModuleUtils.LoadAssemblies([modulePath], null, true, out var loadedAssemblies, out var errorMsg))
{
ModuleRegistrar.Instance.LoadModule(customCommandManager, loadedAssemblies.ToList()[0], moduleArgs, logger, out errorMsg);
}
else
{
logger?.LogError("Module {0} failed to load with error {1}", modulePath, Encoding.UTF8.GetString(errorMsg));
}
}
}
private void CreateMainStore(IClusterFactory clusterFactory, out string checkpointDir)
{
kvSettings = opts.GetSettings(loggerFactory, out logFactory);
checkpointDir = opts.CheckpointDir ?? opts.LogDir;
// Run checkpoint on its own thread to control p99
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
kvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;
var checkpointFactory = opts.DeviceFactoryCreator();
if (opts.EnableCluster)
{
kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(checkpointFactory,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true, logger);
}
else
{
kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(checkpointFactory,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
}
store = new(kvSettings
, StoreFunctions<SpanByte, SpanByte>.Create()
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
}
private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string CheckpointDir, out CacheSizeTracker objectStoreSizeTracker)
{
objectStoreSizeTracker = null;
if (!opts.DisableObjects)
{
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
out var objHeapMemorySize);
// Run checkpoint on its own thread to control p99
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
objKvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;
if (opts.EnableCluster)
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
opts.DeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
isMainStore: false, logger);
else
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
removeOutdated: true);
objectStore = new(objKvSettings
, StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
() => new ByteArrayBinaryObjectSerializer(),
() => new GarnetObjectSerializer(customCommandManager))
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
if (objHeapMemorySize > 0)
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objHeapMemorySize,
this.loggerFactory);
}
}
private void CreateAOF()
{
if (opts.EnableAOF)
{
if (opts.MainMemoryReplication && opts.CommitFrequencyMs != -1)
throw new Exception("Need to set CommitFrequencyMs to -1 (manual commits) with MainMemoryReplication");
opts.GetAofSettings(out var aofSettings);
aofDevice = aofSettings.LogDevice;
appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
throw new Exception("Cannot use CommitWait with manual commits");
return;
}
if (opts.CommitFrequencyMs != 0 || opts.WaitForCommit)
throw new Exception("Cannot use CommitFrequencyMs or CommitWait without EnableAOF");
}
/// <summary>
/// Start server instance
/// </summary>
public void Start()
{
Provider.Recover();
server.Start();
Provider.Start();
if (!opts.QuietMode)
Console.WriteLine("* Ready to accept connections");
}
/// <summary>
/// Dispose store (including log and checkpoint directory)
/// </summary>
public void Dispose()
{
Dispose(cleanupDir);
}
/// <summary>
/// Dispose, optionally deleting logs and checkpoints
/// </summary>
/// <param name="deleteDir">Whether to delete logs and checkpoints</param>
public void Dispose(bool deleteDir = true)
{
InternalDispose();
if (deleteDir)
{
logFactory?.Delete(new FileDescriptor { directoryName = "" });
if (opts.CheckpointDir != opts.LogDir && !string.IsNullOrEmpty(opts.CheckpointDir))
{
var ckptdir = opts.DeviceFactoryCreator();
ckptdir.Initialize(opts.CheckpointDir);
ckptdir.Delete(new FileDescriptor { directoryName = "" });
}
}
}
private void InternalDispose()
{
Provider?.Dispose();
server.Dispose();
subscribeBroker?.Dispose();
store.Dispose();
appendOnlyFile?.Dispose();
aofDevice?.Dispose();
kvSettings.LogDevice?.Dispose();
if (!opts.DisableObjects)
{
objectStore.Dispose();
objKvSettings.LogDevice?.Dispose();
objKvSettings.ObjectLogDevice?.Dispose();
}
opts.AuthSettings?.Dispose();
if (disposeLoggerFactory)
loggerFactory?.Dispose();
}
private static void DeleteDirectory(string path)
{
if (path == null) return;
// Exceptions may happen due to a handle briefly remaining held after Dispose().
try
{
foreach (string directory in Directory.GetDirectories(path))
{
DeleteDirectory(directory);
}
Directory.Delete(path, true);
}
catch (Exception ex) when (ex is IOException ||
ex is UnauthorizedAccessException)
{
try
{
Directory.Delete(path, true);
}
catch { }
}
}
/// <summary>
/// Flushes MemoryLogger entries into a destination logger.
/// Destination logger is either created from ILoggerFactory parameter or from a default console logger.
/// </summary>
/// <param name="memoryLogger">The memory logger</param>
/// <param name="categoryName">The category name of the destination logger</param>
/// <param name="dstLoggerFactory">Optional logger factory for creating the destination logger</param>
private static void FlushMemoryLogger(MemoryLogger memoryLogger, string categoryName, ILoggerFactory dstLoggerFactory = null)
{
if (memoryLogger == null) return;
// If no logger factory supplied, create a default console logger
var disposeDstLoggerFactory = false;
if (dstLoggerFactory == null)
{
dstLoggerFactory = LoggerFactory.Create(builder => builder.AddSimpleConsole(options =>
{
options.SingleLine = true;
options.TimestampFormat = "hh::mm::ss ";
}).SetMinimumLevel(LogLevel.Information));
disposeDstLoggerFactory = true;
}
// Create the destination logger
var dstLogger = dstLoggerFactory.CreateLogger(categoryName);
// Flush all entries from the memory logger into the destination logger
memoryLogger.FlushLogger(dstLogger);
// If a default console logger factory was created, it is no longer needed
if (disposeDstLoggerFactory)
{
dstLoggerFactory.Dispose();
}
}
}
}