diff --git a/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageIntegrationTests.cs b/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageIntegrationTests.cs index 99279a129cdd..86e75574ea49 100644 --- a/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageIntegrationTests.cs +++ b/src/Nethermind/Nethermind.Db.Test/LogIndex/LogIndexStorageIntegrationTests.cs @@ -29,9 +29,6 @@ namespace Nethermind.Db.Test.LogIndex { - // TODO: test for reorg out-of-order - // TODO: test for concurrent reorg and backward sync - // TODO: test for background job failure [TestFixtureSource(nameof(TestCases))] [Parallelizable(ParallelScope.All)] [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] @@ -60,7 +57,7 @@ public class LogIndexStorageIntegrationTests(LogIndexStorageIntegrationTests.Tes private ILogIndexStorage CreateLogIndexStorage( int compactionDistance = 262_144, int compressionParallelism = 16, int maxReorgDepth = 64, IDbFactory? dbFactory = null, - string? compressionAlgo = null, int? failOnBlock = null, int? failOnCallN = null + string? compressionAlgo = null, int? failOnBlock = null, int? failOnCallN = null, bool failOnMerge = false ) { LogIndexConfig config = new() @@ -72,16 +69,16 @@ private ILogIndexStorage CreateLogIndexStorage( CompressionAlgorithm = compressionAlgo ?? testData.Compression }; - ILogIndexStorage storage = failOnBlock is not null || failOnCallN is not null - ? new SaveFailingLogIndexStorage(dbFactory ?? _dbFactory, LimboLogs.Instance, config) + ILogIndexStorage storage = failOnBlock is null && failOnCallN is null + ? new LogIndexStorage(dbFactory ?? _dbFactory, LimboLogs.Instance, config) + : new SaveFailingLogIndexStorage(dbFactory ?? _dbFactory, LimboLogs.Instance, config) { FailOnBlock = failOnBlock ?? 0, - FailOnCallN = failOnCallN ?? 0 - } - : new LogIndexStorage(dbFactory ?? _dbFactory, LimboLogs.Instance, config); + FailOnCallN = failOnCallN ?? 0, + FailOnMerge = failOnMerge + }; - _createdStorages.Add(storage); - return storage; + return storage.AddTo(_createdStorages); } [SetUp] @@ -146,7 +143,7 @@ public async Task Set_Get_Test( [Values] bool compact ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance, ioParallelism); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance, ioParallelism); BlockReceipts[][] batches = isBackwardsSync ? Reverse(testData.Batches) : testData.Batches; await AddReceiptsAsync(logIndexStorage, batches, isBackwardsSync); @@ -165,7 +162,7 @@ public async Task SetIntersecting_Get_Test( [Values] bool compact ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance, ioParallelism); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance, ioParallelism); BlockReceipts[][] batches = isBackwardsSync ? Reverse(testData.Batches) : testData.Batches; batches = Intersect(batches); @@ -182,9 +179,9 @@ public async Task BackwardsSet_Set_Get_Test( [Values(100, 200, int.MaxValue)] int compactionDistance ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); - var batches = testData.Batches; + BlockReceipts[][] batches = testData.Batches; var half = batches.Length / 2; for (var i = 0; i < half + 1; i++) @@ -205,16 +202,16 @@ public async Task Concurrent_BackwardsSet_Set_Get_Test( [Values(100, int.MaxValue)] int compactionDistance ) { - await using (var setStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage setStorage = CreateLogIndexStorage(compactionDistance)) { - var half = testData.Batches.Length / 2; - var batches = testData.Batches + int half = testData.Batches.Length / 2; + BlockReceipts[][] batches = testData.Batches .Select((b, i) => i >= half ? b : b.Reverse().ToArray()) .ToArray(); var forwardTask = Task.Run(async () => { - for (var i = half; i < batches.Length; i++) + for (int i = half; i < batches.Length; i++) { BlockReceipts[] batch = batches[i]; await AddReceiptsAsync(setStorage, [batch], isBackwardsSync: false); @@ -226,7 +223,7 @@ public async Task Concurrent_BackwardsSet_Set_Get_Test( var backwardTask = Task.Run(async () => { - for (var i = half - 1; i >= 0; i--) + for (int i = half - 1; i >= 0; i--) { BlockReceipts[] batch = batches[i]; await AddReceiptsAsync(setStorage, [batch], isBackwardsSync: true); @@ -236,22 +233,57 @@ public async Task Concurrent_BackwardsSet_Set_Get_Test( } }); - await forwardTask; - await backwardTask; + await Task.WhenAll(forwardTask, backwardTask); } // Create new storage to force-load everything from DB - await using (var testStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage testStorage = CreateLogIndexStorage(compactionDistance)) VerifyReceipts(testStorage, testData); } + [Combinatorial] + [Repeat(RaceConditionTestRepeat)] + [SuppressMessage("ReSharper", "AccessToDisposedClosure")] + public async Task Concurrent_BackwardSet_Reorg_Get_Test( + [Values(1, 5, 10)] int reorgDepth, + [Values(100, int.MaxValue)] int compactionDistance + ) + { + int half = testData.Batches.Length / 2; + BlockReceipts[][] forwardBatches = testData.Batches.Skip(half).ToArray(); + BlockReceipts[][] backwardBatches = testData.Batches.Take(half).ToArray(); + + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); + + // Add forward blocks first to establish the head of the chain + await AddReceiptsAsync(logIndexStorage, forwardBatches, isBackwardsSync: false); + + BlockReceipts[] reorgBlocks = forwardBatches.SelectMany(b => b).TakeLast(reorgDepth).ToArray(); + + var reorgTask = Task.Run(async () => + { + foreach (BlockReceipts block in reorgBlocks) + await logIndexStorage.RemoveReorgedAsync(block); + }); + + var backwardTask = Task.Run(async () => + { + foreach (BlockReceipts[] batch in Reverse(backwardBatches)) + await AddReceiptsAsync(logIndexStorage, [batch], isBackwardsSync: true); + }); + + await Task.WhenAll(reorgTask, backwardTask); + + VerifyReceipts(logIndexStorage, testData, excludedBlocks: reorgBlocks, maxBlock: reorgBlocks[0].BlockNumber - 1); + } + [Combinatorial] public async Task Set_ReorgLast_Get_Test( [Values(1, 5, 20)] int reorgDepth, [Values(100, int.MaxValue)] int compactionDistance ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -262,13 +294,34 @@ public async Task Set_ReorgLast_Get_Test( VerifyReceipts(logIndexStorage, testData, excludedBlocks: reorgBlocks, maxBlock: reorgBlocks[0].BlockNumber - 1); } + [Ignore("Out-of-order reorgs are not supported ATM: only the first (by write order) Reorg operand per key is applied by MergeOperator.")] + [Combinatorial] + public async Task Set_ReorgOutOfOrder_Get_Test( + [Values(2, 5, 10)] int reorgDepth, + [Values(100, int.MaxValue)] int compactionDistance + ) + { + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); + + await AddReceiptsAsync(logIndexStorage, testData.Batches); + + BlockReceipts[] reverseReorg = testData.Batches.SelectMany(b => b).Reverse().Take(reorgDepth).ToArray(); + + foreach (BlockReceipts block in reverseReorg) + await logIndexStorage.RemoveReorgedAsync(block); + + // Full data verification: would fail because MergeOperator only applies the first Reorg operand + // (the highest block in descending order), leaving intermediate blocks as stale data. + VerifyReceipts(logIndexStorage, testData, excludedBlocks: reverseReorg, maxBlock: reverseReorg[0].BlockNumber - 1); + } + [Combinatorial] public async Task Set_ReorgAndSetLast_Get_Test( [Values(1, 5, 20)] int reorgDepth, [Values(100, int.MaxValue)] int compactionDistance ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -288,7 +341,7 @@ public async Task Set_ReorgLast_SetLast_Get_Test( [Values(100, int.MaxValue)] int compactionDistance ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -308,7 +361,7 @@ public async Task Set_ReorgNonexistent_Get_Test( [Values(100, int.MaxValue)] int compactionDistance ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -329,7 +382,7 @@ public async Task Set_ReorgNonexistent_Get_Test( [TestCase(65, 64, Explicit = true)] public async Task Set_Compact_ReorgLast_Get_Test(int reorgDepth, int maxReorgDepth) { - var logIndexStorage = CreateLogIndexStorage(maxReorgDepth: maxReorgDepth); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(maxReorgDepth: maxReorgDepth); await AddReceiptsAsync(logIndexStorage, testData.Batches); await CompactAsync(logIndexStorage); @@ -349,7 +402,7 @@ public async Task Set_PeriodicReorg_Get_Test( [Values] bool compactAfter ) { - var logIndexStorage = CreateLogIndexStorage(); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(); var random = new Random(42); var allReorgBlocks = new List(); @@ -385,11 +438,11 @@ public async Task Set_ConsecutiveReorgsLast_Get_Test( [Values] bool compactBetween ) { - var logIndexStorage = CreateLogIndexStorage(); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(); await AddReceiptsAsync(logIndexStorage, testData.Batches); - var testBlocks = testData.Batches.SelectMany(b => b).ToArray(); + BlockReceipts[] testBlocks = testData.Batches.SelectMany(b => b).ToArray(); foreach (var reorgDepth in reorgDepths) { @@ -411,13 +464,13 @@ [Values] bool isBackwardsSync { var half = testData.Batches.Length / 2; - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) await AddReceiptsAsync(logIndexStorage, testData.Batches.Take(half)); - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) await AddReceiptsAsync(logIndexStorage, testData.Batches.Skip(half)); - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) VerifyReceipts(logIndexStorage, testData); } @@ -427,7 +480,7 @@ public async Task RepeatedSet_Get_Test( [Values] bool isBackwardsSync ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); await AddReceiptsAsync(logIndexStorage, testData.Batches); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -441,13 +494,13 @@ public async Task RepeatedSetMultiInstance_Get_Test( [Values] bool isBackwardsSync ) { - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) await AddReceiptsAsync(logIndexStorage, testData.Batches); - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) await AddReceiptsAsync(logIndexStorage, testData.Batches); - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) VerifyReceipts(logIndexStorage, testData); } @@ -458,10 +511,10 @@ public async Task Set_NewInstance_Get_Test( [Values] bool isBackwardsSync ) { - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) await AddReceiptsAsync(logIndexStorage, testData.Batches); - await using (var logIndexStorage = CreateLogIndexStorage(compactionDistance)) + await using (ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance)) VerifyReceipts(logIndexStorage, testData); } @@ -473,18 +526,18 @@ public async Task Set_ConcurrentGet_Test( [Values] bool isBackwardsSync ) { - var logIndexStorage = CreateLogIndexStorage(compactionDistance); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(compactionDistance); using var getCancellation = new CancellationTokenSource(); - var token = getCancellation.Token; + CancellationToken token = getCancellation.Token; ConcurrentBag exceptions = []; - var getThreads = new[] - { - new Thread(() => VerifyReceiptsPartialLoop(new Random(42), logIndexStorage, testData, exceptions, token)), - new Thread(() => VerifyReceiptsPartialLoop(new Random(4242), logIndexStorage, testData, exceptions, token)), - new Thread(() => VerifyReceiptsPartialLoop(new Random(424242), logIndexStorage, testData, exceptions, token)), - }; + Thread[] getThreads = + [ + new(() => VerifyReceiptsPartialLoop(new Random(42), logIndexStorage, testData, exceptions, token)), + new(() => VerifyReceiptsPartialLoop(new Random(4242), logIndexStorage, testData, exceptions, token)), + new(() => VerifyReceiptsPartialLoop(new Random(424242), logIndexStorage, testData, exceptions, token)) + ]; getThreads.ForEach(t => t.Start()); await AddReceiptsAsync(logIndexStorage, testData.Batches); @@ -507,7 +560,7 @@ [Values] bool isBackwardsSync BlockReceipts[][] batches = isBackwardsSync ? Reverse(testData.Batches) : testData.Batches; var midBlock = testData.Batches[^1][^1].BlockNumber / 2; - await using var failLogIndexStorage = CreateLogIndexStorage(failOnBlock: midBlock, failOnCallN: failOnCallN); + await using ILogIndexStorage failLogIndexStorage = CreateLogIndexStorage(failOnBlock: midBlock, failOnCallN: failOnCallN); Exception exception = Assert.ThrowsAsync(() => AddReceiptsAsync(failLogIndexStorage, batches, isBackwardsSync)); Assert.That(exception, Has.Message.EqualTo(SaveFailingLogIndexStorage.FailMessage)); @@ -527,18 +580,63 @@ [Values] bool isBackwardsSync BlockReceipts[][] batches = isBackwardsSync ? Reverse(testData.Batches) : testData.Batches; var midBlock = testData.Batches[^1][^1].BlockNumber / 2; - await using (var failLogIndexStorage = CreateLogIndexStorage(failOnBlock: midBlock, failOnCallN: failOnCallN)) + await using (ILogIndexStorage failLogIndexStorage = CreateLogIndexStorage(failOnBlock: midBlock, failOnCallN: failOnCallN)) { Exception exception = Assert.ThrowsAsync(() => AddReceiptsAsync(failLogIndexStorage, batches, isBackwardsSync)); Assert.That(exception, Has.Message.EqualTo(SaveFailingLogIndexStorage.FailMessage)); } - await using var logIndexStorage = CreateLogIndexStorage(); + await using ILogIndexStorage logIndexStorage = CreateLogIndexStorage(); await AddReceiptsAsync(logIndexStorage, batches, isBackwardsSync); VerifyReceipts(logIndexStorage, testData); } + [Combinatorial] + [SuppressMessage("ReSharper", "AccessToDisposedClosure")] + public async Task SetMergeFailure_AnyWrite_Test( + [Values(1, 20, 51, 100)] int failOnCallN + ) + { + var midBlock = testData.Batches[^1][^1].BlockNumber / 2; + await using var storage = (LogIndexStorage)CreateLogIndexStorage(failOnBlock: midBlock, failOnCallN: failOnCallN, failOnMerge: true); + + try + { + await AddReceiptsAsync(storage, testData.Batches); + + // force compaction if the error hasn't propagated already + await storage.CompactAsync(); + } + catch (LogIndexStateException) + { + // Expected + } + + Assert.That(storage.HasBackgroundError, Is.True); + + IEnumerable sinceLastBatch = testData.Batches.SelectMany(b => b) + .SkipWhile(b => b.BlockNumber < storage.MaxBlockNumber); + + await Assert.ThatAsync( + () => storage.AddReceiptsAsync(sinceLastBatch.Skip(1).Take(10).ToArray(), false), + Throws.InstanceOf().And.Message.Contain("merge") + ); + + await Assert.ThatAsync( + () => storage.RemoveReorgedAsync(sinceLastBatch.First()), + Throws.InstanceOf().And.Message.Contain("merge") + ); + + await Assert.ThatAsync( + () => storage.CompactAsync(), + Throws.InstanceOf().And.Message.Contain("merge") + ); + + Assert.DoesNotThrowAsync(() => storage.StopAsync()); + Assert.DoesNotThrowAsync(() => storage.StopAsync()); + } + [Combinatorial] public async Task Set_AlgoChange_Test() { @@ -560,10 +658,10 @@ private static BlockReceipts[] GenerateBlocks(Random random, int from, int count private static async Task AddReceiptsAsync(ILogIndexStorage logIndexStorage, IEnumerable batches, bool isBackwardsSync = false) { - var timestamp = Stopwatch.GetTimestamp(); + long timestamp = Stopwatch.GetTimestamp(); - var totalStats = new LogIndexUpdateStats(logIndexStorage); - var (count, length) = (0, 0); + LogIndexUpdateStats totalStats = new(logIndexStorage); + (int count, int length) = (0, 0); foreach (BlockReceipts[] batch in batches) { count++; @@ -605,7 +703,7 @@ private static void VerifyReceipts(ILogIndexStorage logIndexStorage, TestData te } } - foreach (var (address, blocks) in testData.AddressMap) + foreach ((Address address, HashSet blocks) in testData.AddressMap) { IEnumerable expectedBlocks = blocks; @@ -635,9 +733,9 @@ private static void VerifyReceipts(ILogIndexStorage logIndexStorage, TestData te } } - foreach (var (idx, byTopic) in testData.TopicMap) + foreach ((int idx, Dictionary> byTopic) in testData.TopicMap) { - foreach (var (topic, blocks) in byTopic) + foreach ((Hash256 topic, HashSet blocks) in byTopic) { IEnumerable expectedBlocks = blocks; @@ -691,14 +789,14 @@ private static void VerifyReceiptsPartialLoop(Random random, ILogIndexStorage lo { try { - var (addresses, topics) = (testData.Addresses, testData.Topics); + (List
addresses, List<(int, Hash256)> topics) = (testData.Addresses, testData.Topics); while (!cancellationToken.IsCancellationRequested) { if (addresses.Count != 0) { - var address = random.NextFrom(addresses); - var expectedBlocks = testData.AddressMap[address]; + Address address = random.NextFrom(addresses); + HashSet expectedBlocks = testData.AddressMap[address]; if (logIndexStorage.MinBlockNumber is not { } min || logIndexStorage.MaxBlockNumber is not { } max) continue; @@ -712,8 +810,8 @@ private static void VerifyReceiptsPartialLoop(Random random, ILogIndexStorage lo if (topics.Count != 0) { - var (idx, topic) = random.NextFrom(topics); - var expectedBlocks = testData.TopicMap[idx][topic]; + (int idx, Hash256 topic) = random.NextFrom(topics); + HashSet expectedBlocks = testData.TopicMap[idx][topic]; if (logIndexStorage.MinBlockNumber is not { } min || logIndexStorage.MaxBlockNumber is not { } max) continue; @@ -732,12 +830,12 @@ private static void VerifyReceiptsPartialLoop(Random random, ILogIndexStorage lo } } - private static BlockReceipts[][] Reverse(IEnumerable batches) + private static BlockReceipts[][] Reverse(BlockReceipts[][] batches) { - var length = batches.Count(); - var result = new BlockReceipts[length][]; + int length = batches.Length; + BlockReceipts[][] result = new BlockReceipts[length][]; - var index = 0; + int index = 0; foreach (BlockReceipts[] batch in batches.Reverse()) result[index++] = batch.Reverse().ToArray(); @@ -746,9 +844,9 @@ private static BlockReceipts[][] Reverse(IEnumerable batches) private static BlockReceipts[][] Intersect(BlockReceipts[][] batches) { - var result = new BlockReceipts[batches.Length + 1][]; + BlockReceipts[][] result = new BlockReceipts[batches.Length + 1][]; - for (var i = 0; i < result.Length; i++) + for (int i = 0; i < result.Length; i++) { if (i == 0) result[i] = batches[i]; @@ -763,7 +861,7 @@ private static BlockReceipts[][] Intersect(BlockReceipts[][] batches) private static async Task CompactAsync(ILogIndexStorage logIndexStorage) { - var timestamp = Stopwatch.GetTimestamp(); + long timestamp = Stopwatch.GetTimestamp(); await ((LogIndexStorage)logIndexStorage).CompactAsync(); if (LogStatistics) @@ -851,24 +949,24 @@ private BlockReceipts[][] GenerateBatches(Random random, int batchCount, int blo new(new byte[] { 0 }.PadLeft(Hash256.Size, 0xFF)), new(new byte[] { 0 }.PadRight(Hash256.Size, 0xFF)), ]; - var addresses = Enumerable.Repeat(0, Math.Max(10, blocksCount / 5) - customAddresses.Length) + Address[] addresses = Enumerable.Repeat(0, Math.Max(10, blocksCount / 5) - customAddresses.Length) //var addresses = Enumerable.Repeat(0, 0) .Select(_ => new Address(random.NextBytes(Address.Size))) .Concat(customAddresses) .ToArray(); - var topics = Enumerable.Repeat(0, addresses.Length * 7 - customTopics.Length) + Hash256[] topics = Enumerable.Repeat(0, addresses.Length * 7 - customTopics.Length) //var topics = Enumerable.Repeat(0, 0) .Select(_ => new Hash256(random.NextBytes(Hash256.Size))) .Concat(customTopics) .ToArray(); // Generate batches - var blockNum = startNum; - for (var i = 0; i < batches.Length; i++) + int blockNum = startNum; + for (int i = 0; i < batches.Length; i++) { - var batch = batches[i] = new BlockReceipts[blocksPerBatch]; + BlockReceipts[] batch = batches[i] = new BlockReceipts[blocksPerBatch]; - for (var j = 0; j < batch.Length; j++) + for (int j = 0; j < batch.Length; j++) batch[j] = new(blockNum++, GenerateReceipts(random, addresses, topics)); } @@ -885,22 +983,22 @@ private BlockReceipts[][] GenerateBatches(Random random, int batchCount, int blo public static (Dictionary> address, Dictionary>> topic) GenerateMaps( IEnumerable blocks) { - var address = new Dictionary>(); - var topic = new Dictionary>>(); + Dictionary> address = new(); + Dictionary>> topic = new(); - foreach (var block in blocks) + foreach (BlockReceipts block in blocks) { - foreach (var txReceipt in block.Receipts) + foreach (TxReceipt txReceipt in block.Receipts) { - foreach (var log in txReceipt.Logs!) + foreach (LogEntry log in txReceipt.Logs!) { - var addressMap = address.GetOrAdd(log.Address, static _ => []); + HashSet addressMap = address.GetOrAdd(log.Address, static _ => []); addressMap.Add(block.BlockNumber); - for (var i = 0; i < log.Topics.Length; i++) + for (int i = 0; i < log.Topics.Length; i++) { - var topicI = topic.GetOrAdd(i, static _ => []); - var topicMap = topicI.GetOrAdd(log.Topics[i], static _ => []); + Dictionary> topicI = topic.GetOrAdd(i, static _ => []); + HashSet topicMap = topicI.GetOrAdd(log.Topics[i], static _ => []); topicMap.Add(block.BlockNumber); } } @@ -925,11 +1023,11 @@ private static TxReceipt[] GenerateReceipts(Random random, Address[] addresses, ).TestObject ).ToArray(); - var receipts = new List(); + List receipts = new(); for (var i = 0; i < logs.Length;) { - var count = random.Next(logsPerTx.min, Math.Min(logsPerTx.max, logs.Length - i) + 1); - var range = i..(i + count); + int count = random.Next(logsPerTx.min, Math.Min(logsPerTx.max, logs.Length - i) + 1); + Range range = i..(i + count); receipts.Add(new() { Logs = logs[range] }); i = range.End.Value; @@ -940,15 +1038,15 @@ private static TxReceipt[] GenerateReceipts(Random random, Address[] addresses, private static HashSet<(int from, int to)> GenerateSimpleRanges(int min, int max) { - var quarter = (max - min) / 4; + int quarter = (max - min) / 4; return [(0, int.MaxValue), (min, max), (min + quarter, max - quarter)]; } private static HashSet<(int from, int to)> GenerateExtendedRanges(int min, int max) { - var ranges = new HashSet<(int, int)>(); + HashSet<(int, int)> ranges = new(); - var edges = new[] { min - 1, min, min + 1, max - 1, max + 1 }; + int[] edges = [min - 1, min, min + 1, max - 1, max + 1]; ranges.AddRange(edges.SelectMany(_ => edges, static (x, y) => (x, y))); const int step = 100; @@ -980,8 +1078,10 @@ private class SaveFailingLogIndexStorage(IDbFactory dbFactory, ILogManager logMa public int FailOnBlock { get; init; } public int FailOnCallN { get; init; } + public bool FailOnMerge { get; init; } private int _count; + private bool _corrupted; protected override void MergeBlockNumbers(IWriteBatch dbBatch, ReadOnlySpan key, List numbers, bool isBackwardSync, LogIndexUpdateStats? stats) { @@ -989,8 +1089,19 @@ protected override void MergeBlockNumbers(IWriteBatch dbBatch, ReadOnlySpan= Math.Min(numbers[0], numbers[^1]) && FailOnBlock <= Math.Max(numbers[0], numbers[^1]); - if (isFailBlock && Interlocked.Increment(ref _count) >= FailOnCallN) - throw new(FailMessage); + if (isFailBlock && Interlocked.Increment(ref _count) >= FailOnCallN && !Interlocked.Exchange(ref _corrupted, true)) + { + if (FailOnMerge) + { + // Force "invalid order" in MergeOperator + int invalidBlockNum = isBackwardSync ? int.MaxValue : 0; + base.MergeBlockNumbers(dbBatch, key, [invalidBlockNum], isBackwardSync, stats); + } + else + { + throw new(FailMessage); + } + } base.MergeBlockNumbers(dbBatch, key, numbers, isBackwardSync, stats); }