From 6c9c78b73ccb01a00e8e0efa097f799aad06dd3c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 21 May 2025 20:27:58 +0700 Subject: [PATCH 1/2] Improve journal delete performance --- .../AkkaPersistenceDataConnectionFactory.cs | 2 +- .../Journal/Dao/BaseByteArrayJournalDao.cs | 41 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/Akka.Persistence.Sql/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql/Db/AkkaPersistenceDataConnectionFactory.cs index 01e2e7f2..7506d6a8 100644 --- a/src/Akka.Persistence.Sql/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql/Db/AkkaPersistenceDataConnectionFactory.cs @@ -42,7 +42,7 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig _useCloneDataConnection = config.UseCloneConnection; - if (_opts.RetryPolicyOptions.RetryPolicy is null && _opts.ConnectionOptions.ProviderName!.ToLowerInvariant().StartsWith("sqlserver")) + if (_opts.RetryPolicyOptions.RetryPolicy is null && _opts.RetryPolicyOptions.Factory is null && _opts.ConnectionOptions.ProviderName!.ToLowerInvariant().StartsWith("sqlserver")) _opts = _opts.WithOptions( _opts.RetryPolicyOptions with { RetryPolicy = new SqlServerRetryPolicy() } ); _cloneConnection = new Lazy( diff --git a/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs index bd24bbb4..2ede626c 100644 --- a/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs @@ -112,22 +112,34 @@ public async Task> AsyncWriteMessages( public async Task Delete(string persistenceId, long maxSequenceNr) { + long maxMarkedDeletion; + + // no need to use transaction + await using (var connection = ConnectionFactory.GetConnection()) + { + maxMarkedDeletion = await MaxMarkedForDeletionMaxPersistenceIdQuery(connection, persistenceId, maxSequenceNr).FirstOrDefaultAsync(ShutdownToken); + } + await ConnectionFactory.ExecuteWithTransactionAsync( WriteIsolationLevel, ShutdownToken, async (connection, token) => { - await connection - .GetTable() + var journalTable = connection.GetTable(); + await journalTable .Where( r => r.PersistenceId == persistenceId && - r.SequenceNumber <= maxSequenceNr) + r.SequenceNumber == maxMarkedDeletion) .Set(r => r.Deleted, true) .UpdateAsync(token); - var maxMarkedDeletion = - await MaxMarkedForDeletionMaxPersistenceIdQuery(connection, persistenceId).FirstOrDefaultAsync(token); + await journalTable + .Where( + r => + r.PersistenceId == persistenceId && + r.SequenceNumber < maxMarkedDeletion) + .DeleteAsync(token); if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode) { @@ -146,19 +158,7 @@ await connection SequenceNumber = maxMarkedDeletion, }, token: token); - } - await connection - .GetTable() - .Where( - r => - r.PersistenceId == persistenceId && - r.SequenceNumber <= maxSequenceNr && - r.SequenceNumber < maxMarkedDeletion) - .DeleteAsync(token); - - if (JournalConfig.DaoConfig.SqlCommonCompatibilityMode) - { await connection .GetTable() .Where( @@ -174,7 +174,7 @@ await connection .GetTable() .Where( r => - r.SequenceNumber <= maxSequenceNr && + r.SequenceNumber < maxMarkedDeletion && r.PersistenceId == persistenceId) .DeleteAsync(token); } @@ -436,10 +436,11 @@ protected static ImmutableList BuildWriteRejections(List MaxMarkedForDeletionMaxPersistenceIdQuery( AkkaDataConnection connection, - string persistenceId) + string persistenceId, + long maxSequenceNr) => connection .GetTable() - .Where(r => r.PersistenceId == persistenceId && r.Deleted) + .Where(r => r.PersistenceId == persistenceId && r.SequenceNumber <= maxSequenceNr) .OrderByDescending(r => r.SequenceNumber) .Select(r => r.SequenceNumber) .Take(1); From 9b5fb98c7e2bdd0bd9355a64057d10b1ff968264 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 21 May 2025 20:34:09 +0700 Subject: [PATCH 2/2] Optimize code --- .../Journal/Dao/BaseByteArrayJournalDao.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs index 2ede626c..0619fdb7 100644 --- a/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs @@ -119,6 +119,9 @@ public async Task Delete(string persistenceId, long maxSequenceNr) { maxMarkedDeletion = await MaxMarkedForDeletionMaxPersistenceIdQuery(connection, persistenceId, maxSequenceNr).FirstOrDefaultAsync(ShutdownToken); } + + if (maxMarkedDeletion is 0) + return; await ConnectionFactory.ExecuteWithTransactionAsync( WriteIsolationLevel,