diff --git a/cspell.json b/cspell.json index 24380f442a76..863c140ab6c9 100644 --- a/cspell.json +++ b/cspell.json @@ -134,6 +134,7 @@ "collectd", "colour", "commitset", + "compactable", "comparand", "concurrenc", "configurer", diff --git a/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageCompactorTests.cs b/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageCompactorTests.cs new file mode 100644 index 000000000000..82262f02ff95 --- /dev/null +++ b/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageCompactorTests.cs @@ -0,0 +1,182 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Db.LogIndex; +using Nethermind.Logging; +using NSubstitute; +using NUnit.Framework; +using static Nethermind.Db.LogIndex.LogIndexStorage; + +namespace Nethermind.Db.Test.LogIndex; + +[TestFixture] +[Parallelizable(ParallelScope.All)] +[FixtureLifeCycle(LifeCycle.InstancePerTestCase)] +public class LogIndexStorageCompactorTests +{ + private const int RaceConditionTestRepeat = 3; + + private static ILogIndexStorage MockStorage(int? min = null, int? max = null) + { + ILogIndexStorage storage = Substitute.For(); + storage.MinBlockNumber.Returns(min); + storage.MaxBlockNumber.Returns(max); + return storage; + } + + private static Compactor CreateCompactor(ILogIndexStorage storage, IDbMeta? db = null, int compactionDistance = 100) => + new(storage, db ?? new FakeDb(), LimboLogs.Instance.GetClassLogger(), compactionDistance); + + private static Compactor CreateCompactor(ILogIndexStorage storage, int compactionDistance = 100) => + CreateCompactor(storage, db: null, compactionDistance: compactionDistance); + + [TestCase(0, 50, 0, 50, 100, ExpectedResult = false, Description = "No change from baseline")] + [TestCase(0, 0, 0, 99, 100, ExpectedResult = false, Description = "99 blocks forward, threshold 100")] + [TestCase(0, 0, 0, 100, 100, ExpectedResult = true, Description = "Exactly at threshold")] + [TestCase(100, 200, 50, 250, 100, ExpectedResult = true, Description = "Both directions sum to threshold")] + public async Task TryEnqueue_Respects_CompactionDistance( + int initMin, int initMax, int newMin, int newMax, int compactionDistance + ) + { + ILogIndexStorage storage = MockStorage(min: initMin, max: initMax); + using Compactor compactor = CreateCompactor(storage, compactionDistance); + + storage.MinBlockNumber.Returns(newMin); + storage.MaxBlockNumber.Returns(newMax); + + bool result = compactor.TryEnqueue(); + + await compactor.StopAsync(); + return result; + } + + [Test] + [Repeat(RaceConditionTestRepeat)] + public async Task TryEnqueue_During_Compact_Does_Not_Run_Compact_Concurrently() + { + const int compactionDistance = 10; + var compactionDelay = TimeSpan.FromMilliseconds(200); + + ILogIndexStorage storage = MockStorage(min: 0, max: 0); + FakeDb db = new(compactionDelay); + using Compactor compactor = CreateCompactor(storage, db, compactionDistance); + + // Trigger first compaction + storage.MaxBlockNumber.Returns(compactionDistance); + Assert.That(compactor.TryEnqueue(), Is.True); + + await Task.Delay(compactionDelay / 4); + + // Try to cause a second compaction + storage.MaxBlockNumber.Returns(storage.MaxBlockNumber + compactionDistance); + compactor.TryEnqueue(); + + await compactor.ForceAsync(); + await compactor.StopAsync(); + } + + [TestCase(false)] + [TestCase(true)] + [Repeat(RaceConditionTestRepeat)] + [SuppressMessage("ReSharper", "AccessToDisposedClosure")] + public async Task ForceAsync_Does_Not_Run_Compact_Concurrently(bool duringCompact) + { + const int compactionDistance = 10; + var compactionDelay = TimeSpan.FromMilliseconds(200); + + ILogIndexStorage storage = MockStorage(min: 0, max: 0); + FakeDb db = new(compactionDelay); + using Compactor compactor = CreateCompactor(storage, db, compactionDistance); + + if (duringCompact) + { + storage.MaxBlockNumber.Returns(compactionDistance); + compactor.TryEnqueue(); + + await Task.Delay(compactionDelay / 4); + } + + const int concurrentCalls = 5; + await Task.WhenAll(Enumerable.Range(0, concurrentCalls).Select(_ => Task.Run(compactor.ForceAsync)).ToArray()); + + await compactor.StopAsync(); + } + + [Test] + public async Task TryEnqueue_Resets_Baseline_After_Enqueue() + { + const int compactionDistance = 10; + + ILogIndexStorage storage = MockStorage(min: 0, max: 0); + using Compactor compactor = CreateCompactor(storage, compactionDistance); + + storage.MaxBlockNumber.Returns(compactionDistance); + Assert.That(compactor.TryEnqueue(), Is.True); + + await Task.Delay(100); + + storage.MaxBlockNumber.Returns(storage.MaxBlockNumber + compactionDistance / 2); + Assert.That(compactor.TryEnqueue(), Is.False); + + storage.MaxBlockNumber.Returns(storage.MaxBlockNumber + compactionDistance); + Assert.That(compactor.TryEnqueue(), Is.True); + + await compactor.StopAsync(); + } + + [Test] + public async Task TryEnqueue_Returns_False_After_Stop() + { + const int compactionDistance = 10; + + ILogIndexStorage storage = MockStorage(min: 0, max: 0); + using Compactor compactor = CreateCompactor(storage, new NonCompactableDb(), compactionDistance); + + await compactor.StopAsync(); + + storage.MaxBlockNumber.Returns(compactionDistance); + Assert.That(compactor.TryEnqueue(), Is.False); + } + + // Fails on compaction attempt + private class NonCompactableDb : IDbMeta + { + private class CompactionException : Exception; + + public void Compact() => throw new CompactionException(); + public void Flush(bool onlyWal = false) { } + } + + // Simulates compaction with Thread.Sleep and fail on concurrent calls + private class FakeDb(TimeSpan? compactDelay = null) : IDbMeta + { + private class ConcurrentCompactionException : Exception; + + private readonly TimeSpan _compactDelay = compactDelay ?? TimeSpan.Zero; + + private int _compacting; + + public void Compact() + { + if (Interlocked.CompareExchange(ref _compacting, 1, 0) != 0) + throw new ConcurrentCompactionException(); + + try + { + if (_compactDelay > TimeSpan.Zero) + Thread.Sleep(_compactDelay); + } + finally + { + Interlocked.Exchange(ref _compacting, 0); + } + } + + public void Flush(bool onlyWal = false) { } + } +} diff --git a/src/Nethermind/Nethermind.Db/LogIndex/Compactor.cs b/src/Nethermind/Nethermind.Db/LogIndex/Compactor.cs index d5ed2dde970d..03561369a2dc 100644 --- a/src/Nethermind/Nethermind.Db/LogIndex/Compactor.cs +++ b/src/Nethermind/Nethermind.Db/LogIndex/Compactor.cs @@ -3,11 +3,13 @@ using System; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; -using Nethermind.Core.Extensions; using Nethermind.Logging; +[assembly: InternalsVisibleTo("Nethermind.Db.Test")] namespace Nethermind.Db.LogIndex; partial class LogIndexStorage @@ -15,26 +17,33 @@ partial class LogIndexStorage /// /// Periodically forces background log index compaction for every N added blocks. /// - private class Compactor : ICompactor + internal class Compactor : ICompactor { private int? _lastAtMin; private int? _lastAtMax; private CompactingStats _stats = new(); - private readonly LogIndexStorage _storage; + private readonly ILogIndexStorage _storage; + private readonly IDbMeta _rootDb; private readonly ILogger _logger; private readonly int _compactionDistance; - // TODO: simplify concurrency handling - private readonly AutoResetEvent _runOnceEvent = new(false); private readonly CancellationTokenSource _cts = new(); - private readonly ManualResetEvent _compactionStartedEvent = new(false); - private readonly ManualResetEvent _compactionEndedEvent = new(true); + + /// + /// Bounded(1) compaction work queue consumed by .
+ /// null — fire-and-forget compaction enqueued by ;
+ /// not null — caller-awaitable compaction enqueued by . + ///
+ private readonly Channel _channel = Channel.CreateBounded(1); + + private volatile TaskCompletionSource? _pendingForcedCompaction; private readonly Task _compactionTask; - public Compactor(LogIndexStorage storage, ILogger logger, int compactionDistance) + public Compactor(ILogIndexStorage storage, IDbMeta rootDb, ILogger logger, int compactionDistance) { _storage = storage; + _rootDb = rootDb; _logger = logger; if (compactionDistance < 1) throw new ArgumentException("Compaction distance must be a positive value.", nameof(compactionDistance)); @@ -66,7 +75,7 @@ public bool TryEnqueue() if (uncompacted < _compactionDistance) return false; - if (!_runOnceEvent.Set()) + if (!_channel.Writer.TryWrite(null)) return false; _lastAtMin = _storage.MinBlockNumber; @@ -77,63 +86,102 @@ public bool TryEnqueue() public async Task StopAsync() { await _cts.CancelAsync(); - await _compactionEndedEvent.WaitOneAsync(CancellationToken.None); + _channel.Writer.TryComplete(); + await _compactionTask; } public async Task ForceAsync() { - // Wait for the previous one to finish - await _compactionEndedEvent.WaitOneAsync(_cts.Token); + // Coalesce concurrent calls — all callers share a single compaction + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource? existing = Interlocked.CompareExchange(ref _pendingForcedCompaction, tcs, null); + + if (existing is not null) + { + await existing.Task; + return _stats; + } + + try + { + await _channel.Writer.WriteAsync(tcs, _cts.Token); + } + catch (Exception ex) + { + Interlocked.CompareExchange(ref _pendingForcedCompaction, null, tcs); + if (ex is OperationCanceledException) + tcs.TrySetCanceled(); + else + tcs.TrySetException(ex); + + throw; + } - _runOnceEvent.Set(); - await _compactionStartedEvent.WaitOneAsync(100, _cts.Token); - await _compactionEndedEvent.WaitOneAsync(_cts.Token); + await tcs.Task; return _stats; } private async Task DoCompactAsync() { CancellationToken cancellation = _cts.Token; - while (!cancellation.IsCancellationRequested) + try { - try - { - await _runOnceEvent.WaitOneAsync(cancellation); - - _compactionEndedEvent.Reset(); - _compactionStartedEvent.Set(); - - if (cancellation.IsCancellationRequested) - return; - - if (_logger.IsInfo) - _logger.Info($"Log index: compaction started, DB size: {_storage.GetDbSize()}"); - - var timestamp = Stopwatch.GetTimestamp(); - _storage._rootDb.Compact(); - - TimeSpan elapsed = Stopwatch.GetElapsedTime(timestamp); - _stats.Total.Include(elapsed); - - if (_logger.IsInfo) - _logger.Info($"Log index: compaction ended in {elapsed}, DB size: {_storage.GetDbSize()}"); - } - catch (OperationCanceledException ex) when (ex.CancellationToken == cancellation) - { - return; - } - catch (Exception ex) + await foreach (TaskCompletionSource? tcs in _channel.Reader.ReadAllAsync(cancellation)) { - _storage.OnBackgroundError(ex); - await _cts.CancelAsync(); + try + { + if (_logger.IsInfo) _logger.Info($"Log index: compaction started, DB size: {_storage.GetDbSize()}"); + + var timestamp = Stopwatch.GetTimestamp(); + _rootDb.Compact(); + + TimeSpan elapsed = Stopwatch.GetElapsedTime(timestamp); + _stats.Total.Include(elapsed); + + if (_logger.IsInfo) _logger.Info($"Log index: compaction ended in {elapsed}, DB size: {_storage.GetDbSize()}"); + + tcs?.TrySetResult(); + } + catch (OperationCanceledException) + { + tcs?.TrySetCanceled(); + } + catch (Exception ex) + { + tcs?.TrySetException(ex); + (_storage as LogIndexStorage)?.OnBackgroundError(ex); + + await _cts.CancelAsync(); + _channel.Writer.TryComplete(); + + break; + } + finally + { + if (tcs is not null) + Interlocked.CompareExchange(ref _pendingForcedCompaction, null, tcs); + } } - finally + } + catch (OperationCanceledException) + { + if (_logger.IsDebug) _logger.Debug("Log index: compaction loop canceled"); + } + finally + { + while (_channel.Reader.TryRead(out TaskCompletionSource? remaining) && remaining is not null) { - _compactionStartedEvent.Reset(); - _compactionEndedEvent.Set(); + remaining.TrySetCanceled(); + Interlocked.CompareExchange(ref _pendingForcedCompaction, null, remaining); } } } + + public void Dispose() + { + _cts.Dispose(); + _channel.Writer.TryComplete(); + } } private class NoOpCompactor : ICompactor @@ -142,9 +190,10 @@ private class NoOpCompactor : ICompactor public bool TryEnqueue() => false; public Task StopAsync() => Task.CompletedTask; public Task ForceAsync() => Task.FromResult(new CompactingStats()); + public void Dispose() { } } - private interface ICompactor + internal interface ICompactor : IDisposable { CompactingStats GetAndResetStats(); bool TryEnqueue(); diff --git a/src/Nethermind/Nethermind.Db/LogIndex/LogIndexStorage.cs b/src/Nethermind/Nethermind.Db/LogIndex/LogIndexStorage.cs index 0fc3cdb41801..bb628b8407fd 100644 --- a/src/Nethermind/Nethermind.Db/LogIndex/LogIndexStorage.cs +++ b/src/Nethermind/Nethermind.Db/LogIndex/LogIndexStorage.cs @@ -236,10 +236,6 @@ public LogIndexStorage(IDbFactory dbFactory, ILogManager logManager, ILogIndexCo ? new Compressor(this, config.CompressionDistance, config.MaxCompressionParallelism) : new NoOpCompressor(); - _compactor = config.CompactionDistance > 0 - ? new Compactor(this, _logger, config.CompactionDistance) - : new NoOpCompactor(); - for (int i = -1; i < MaxTopics; i++) _mergeOperators[i + 1] = new MergeOperator(this, _compressor, topicIndex: i < 0 ? null : i); @@ -248,6 +244,10 @@ public LogIndexStorage(IDbFactory dbFactory, ILogManager logManager, ILogIndexCo _addressDb = _rootDb.GetColumnDb(LogIndexColumns.Addresses); _topicDbs = Enumerable.Range(0, MaxTopics).Select(topicIndex => _rootDb.GetColumnDb(GetColumn(topicIndex))).ToArray(); + _compactor = config.CompactionDistance > 0 + ? new Compactor(this, _rootDb, _logger, config.CompactionDistance) + : new NoOpCompactor(); + _compressionAlgorithm = SelectCompressionAlgorithm(config.CompressionAlgorithm); (_minBlock, _maxBlock) = (LoadRangeBound(SpecialKey.MinBlockNum), LoadRangeBound(SpecialKey.MaxBlockNum)); @@ -390,7 +390,7 @@ private async Task StopAsync(bool acquireLock) try { - // Disposing RocksDB during any write operation will cause 0xC0000005 + // Disposing RocksDB during any write operation will cause 0xC0000005, so stop them all await Task.WhenAll( _compactor.StopAsync(), _compressor.StopAsync() @@ -449,6 +449,7 @@ private void DisposeCore() _forwardWriteSemaphore.Dispose(); _backwardWriteSemaphore.Dispose(); _compressor?.Dispose(); + _compactor?.Dispose(); DBColumns?.DisposeItems(); _rootDb?.Dispose(); }