From 9308de3e537cce718f51726c28158cd52ed96a14 Mon Sep 17 00:00:00 2001 From: Michael Musgrove Date: Thu, 24 Aug 2023 10:41:19 +0100 Subject: [PATCH] JBTM-3795 recover memory when an LRA terminates --- .../domain/service/LRAService.java | 43 ++++++++++++++----- .../lra/coordinator/domain/model/LRATest.java | 22 ++++++++++ rts/lra/test/pom.xml | 1 + 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/rts/lra/coordinator/src/main/java/io/narayana/lra/coordinator/domain/service/LRAService.java b/rts/lra/coordinator/src/main/java/io/narayana/lra/coordinator/domain/service/LRAService.java index 7c95c4c989..0297acc098 100644 --- a/rts/lra/coordinator/src/main/java/io/narayana/lra/coordinator/domain/service/LRAService.java +++ b/rts/lra/coordinator/src/main/java/io/narayana/lra/coordinator/domain/service/LRAService.java @@ -59,7 +59,7 @@ public class LRAService { private final Map lras = new ConcurrentHashMap<>(); private final Map recoveringLRAs = new ConcurrentHashMap<>(); private final Map locks = new ConcurrentHashMap<>(); - private final Map participants = new ConcurrentHashMap<>(); + private final Map> lraParticipants = new ConcurrentHashMap<>(); private LRARecoveryModule recoveryModule; public LongRunningAction getTransaction(URI lraId) throws NotFoundException { @@ -211,7 +211,11 @@ public void remove(LongRunningAction lra) { public void remove(URI lraId) { lraTrace(lraId, "remove LRA"); - lras.remove(lraId); + LongRunningAction lra = lras.remove(lraId); + + if (lra != null) { + lraParticipants.remove(lra); + } recoveringLRAs.remove(lraId); @@ -225,18 +229,35 @@ public void recover() { public void updateRecoveryURI(URI lraId, String compensatorUrl, String recoveryURI, boolean persist) { assert recoveryURI != null; assert compensatorUrl != null; + LongRunningAction transaction = getTransaction(lraId); + Map participants = lraParticipants.get(transaction); + + // the collection should be thread safe against update requests, even though such concurrent + // updates are improbable because only LRAService.joinLRA and RecoveryCoordinator.replaceCompensator + // do updates but those are sequential operations anyway + if (participants == null) { + participants = new ConcurrentHashMap<>(); + participants.put(recoveryURI, compensatorUrl); + lraParticipants.put(transaction, participants); + } else { + participants.replace(recoveryURI, compensatorUrl); + } - participants.put(recoveryURI, compensatorUrl); - - if (persist && lraId != null) { - LongRunningAction transaction = getTransaction(lraId); - + if (persist) { transaction.updateRecoveryURI(compensatorUrl, recoveryURI); } } public String getParticipant(String rcvCoordId) { - return participants.get(rcvCoordId); + for (Map compensators : lraParticipants.values()) { + String compensator = compensators.get(rcvCoordId); + + if (compensator != null) { + return compensator; + } + } + + return null; } public synchronized LongRunningAction startLRA(String baseUri, URI parentLRA, String clientId, Long timelimit) { @@ -443,11 +464,11 @@ public List getFailedLRAs() { private LRARecoveryModule getRM() { // since this method is reentrant we do not need any synchronization - if (recoveryModule != null) { - return recoveryModule; + if (recoveryModule == null) { + recoveryModule = LRARecoveryModule.getInstance(); } - return LRARecoveryModule.getInstance(); + return recoveryModule; } private List getDataByStatus(Map lrasToFilter, LRAStatus status) { diff --git a/rts/lra/coordinator/src/test/java/io/narayana/lra/coordinator/domain/model/LRATest.java b/rts/lra/coordinator/src/test/java/io/narayana/lra/coordinator/domain/model/LRATest.java index 8f5e5bd13f..10e403dff6 100644 --- a/rts/lra/coordinator/src/test/java/io/narayana/lra/coordinator/domain/model/LRATest.java +++ b/rts/lra/coordinator/src/test/java/io/narayana/lra/coordinator/domain/model/LRATest.java @@ -411,6 +411,28 @@ public void testComplete() throws URISyntaxException { assertTrue("LRA should have closed", status == null || status == LRAStatus.Closed); } + /** + * Run a loop of LRAs so that a debugger can watch memory + * @throws URISyntaxException + */ + @Test + public void testForLeaks() throws URISyntaxException { + int txnCount = 10; + // verify that the participant complete request is issued when a method annotated with @LRA returns + int completions = completeCount.get(); + + // start some LRAs + for (int i = 0; i < txnCount; i++) { + String lraId = client.target(TestPortProvider.generateURL("/base/test/start-end")).request().get(String.class); + LRAStatus status = getStatus(new URI(lraId)); + assertTrue("LRA should have closed", status == null || status == LRAStatus.Closed); + } + + // Remark: there should be no memory leaks in LRAService + + assertEquals(completions + txnCount, completeCount.get()); + } + /** * test that participants that report LRAStatus.Closing are replayed */ diff --git a/rts/lra/test/pom.xml b/rts/lra/test/pom.xml index fa729820ae..5939c374a3 100644 --- a/rts/lra/test/pom.xml +++ b/rts/lra/test/pom.xml @@ -209,6 +209,7 @@ org.wildfly.plugins wildfly-maven-plugin + ${version.wildfly-maven-plugin} ${skip.wildfly.plugin}