Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions core/src/main/java/org/testcontainers/utility/ResourceReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -49,6 +50,11 @@ public final class ResourceReaper {
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceReaper.class);

private static final List<List<Map.Entry<String, String>>> DEATH_NOTE = new ArrayList<>();
private static final RateLimiter RYUK_ACK_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(4, TimeUnit.SECONDS)
.withConstantThroughput()
.build();

private static ResourceReaper instance;
private final DockerClient dockerClient;
Expand Down Expand Up @@ -110,34 +116,36 @@ public static String start(String hostIpAddress, DockerClient client) {
DockerClientFactory.TESTCONTAINERS_THREAD_GROUP,
() -> {
while (true) {
int index = 0;
try(Socket clientSocket = new Socket(hostIpAddress, ryukPort)) {
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());

synchronized (DEATH_NOTE) {
while (true) {
if (DEATH_NOTE.size() <= index) {
try {
DEATH_NOTE.wait(1_000);
continue;
} catch (InterruptedException e) {
throw new RuntimeException(e);
RYUK_ACK_RATE_LIMITER.doWhenReady(() -> {
int index = 0;
try(Socket clientSocket = new Socket(hostIpAddress, ryukPort)) {
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());

synchronized (DEATH_NOTE) {
while (true) {
if (DEATH_NOTE.size() <= index) {
try {
DEATH_NOTE.wait(1_000);
continue;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);
boolean isAcknowledged = registry.register(filters);
if (isAcknowledged) {
log.debug("Received 'ACK' from Ryuk");
ryukScheduledLatch.countDown();
index++;
} else {
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
}
}
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);
boolean isAcknowledged = registry.register(filters);
if (isAcknowledged) {
log.debug("Received 'ACK' from Ryuk");
ryukScheduledLatch.countDown();
index++;
} else {
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
}
}
} catch (IOException e) {
log.warn("Can not connect to Ryuk at {}:{}", hostIpAddress, ryukPort, e);
}
} catch (IOException e) {
log.warn("Can not connect to Ryuk at {}:{}", hostIpAddress, ryukPort, e);
}
});
}
},
"testcontainers-ryuk"
Expand Down Expand Up @@ -341,12 +349,12 @@ public void unregisterNetwork(String identifier) {
public void unregisterContainer(String identifier) {
registeredContainers.remove(identifier);
}

public void registerImageForCleanup(String dockerImageName) {
setHook();
registeredImages.add(dockerImageName);
}

private void removeImage(String dockerImageName) {
LOGGER.trace("Removing image tagged {}", dockerImageName);
try {
Expand Down