Skip to content

Commit

Permalink
JBTM-3795 recover memory when an LRA terminates
Browse files Browse the repository at this point in the history
  • Loading branch information
mmusgrov committed Jan 2, 2024
1 parent 3f6fb54 commit 9308de3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class LRAService {
private final Map<URI, LongRunningAction> lras = new ConcurrentHashMap<>();
private final Map<URI, LongRunningAction> recoveringLRAs = new ConcurrentHashMap<>();
private final Map<URI, ReentrantLock> locks = new ConcurrentHashMap<>();
private final Map<String, String> participants = new ConcurrentHashMap<>();
private final Map<LongRunningAction, Map<String, String>> lraParticipants = new ConcurrentHashMap<>();
private LRARecoveryModule recoveryModule;

public LongRunningAction getTransaction(URI lraId) throws NotFoundException {
Expand Down Expand Up @@ -211,7 +211,11 @@ public void remove(LongRunningAction lra) {
public void remove(URI lraId) {
lraTrace(lraId, "remove LRA");

lras.remove(lraId);
LongRunningAction lra = lras.remove(lraId);

if (lra != null) {
lraParticipants.remove(lra);
}

recoveringLRAs.remove(lraId);

Expand All @@ -225,18 +229,35 @@ public void recover() {
public void updateRecoveryURI(URI lraId, String compensatorUrl, String recoveryURI, boolean persist) {
assert recoveryURI != null;
assert compensatorUrl != null;
LongRunningAction transaction = getTransaction(lraId);
Map<String, String> participants = lraParticipants.get(transaction);

// the <participants> collection should be thread safe against update requests, even though such concurrent
// updates are improbable because only LRAService.joinLRA and RecoveryCoordinator.replaceCompensator
// do updates but those are sequential operations anyway
if (participants == null) {
participants = new ConcurrentHashMap<>();
participants.put(recoveryURI, compensatorUrl);
lraParticipants.put(transaction, participants);
} else {
participants.replace(recoveryURI, compensatorUrl);
}

participants.put(recoveryURI, compensatorUrl);

if (persist && lraId != null) {
LongRunningAction transaction = getTransaction(lraId);

if (persist) {
transaction.updateRecoveryURI(compensatorUrl, recoveryURI);
}
}

public String getParticipant(String rcvCoordId) {
return participants.get(rcvCoordId);
for (Map<String, String> compensators : lraParticipants.values()) {
String compensator = compensators.get(rcvCoordId);

if (compensator != null) {
return compensator;
}
}

return null;
}

public synchronized LongRunningAction startLRA(String baseUri, URI parentLRA, String clientId, Long timelimit) {
Expand Down Expand Up @@ -443,11 +464,11 @@ public List<LRAData> getFailedLRAs() {

private LRARecoveryModule getRM() {
// since this method is reentrant we do not need any synchronization
if (recoveryModule != null) {
return recoveryModule;
if (recoveryModule == null) {
recoveryModule = LRARecoveryModule.getInstance();
}

return LRARecoveryModule.getInstance();
return recoveryModule;
}

private List<LRAData> getDataByStatus(Map<URI, LongRunningAction> lrasToFilter, LRAStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,28 @@ public void testComplete() throws URISyntaxException {
assertTrue("LRA should have closed", status == null || status == LRAStatus.Closed);
}

/**
* Run a loop of LRAs so that a debugger can watch memory
* @throws URISyntaxException
*/
@Test
public void testForLeaks() throws URISyntaxException {
int txnCount = 10;
// verify that the participant complete request is issued when a method annotated with @LRA returns
int completions = completeCount.get();

// start some LRAs
for (int i = 0; i < txnCount; i++) {
String lraId = client.target(TestPortProvider.generateURL("/base/test/start-end")).request().get(String.class);
LRAStatus status = getStatus(new URI(lraId));
assertTrue("LRA should have closed", status == null || status == LRAStatus.Closed);
}

// Remark: there should be no memory leaks in LRAService

assertEquals(completions + txnCount, completeCount.get());
}

/**
* test that participants that report LRAStatus.Closing are replayed
*/
Expand Down
1 change: 1 addition & 0 deletions rts/lra/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
<plugin>
<groupId>org.wildfly.plugins</groupId>
<artifactId>wildfly-maven-plugin</artifactId>
<version>${version.wildfly-maven-plugin}</version>
<configuration>
<skip>${skip.wildfly.plugin}</skip>
</configuration>
Expand Down

0 comments on commit 9308de3

Please sign in to comment.