Skip to content
34 changes: 24 additions & 10 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -85,17 +86,25 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
boolean multiNode = pipelinedResponses.size() > 1;
Executor executor;
ExecutorService executorService = null;
if (multiNode) {
executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
executor = executorService;
} else {
executor = Runnable::run;
}
CountDownLatch countDownLatch = multiNode ? new CountDownLatch(pipelinedResponses.size()) : null;

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
executor.execute(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
Expand All @@ -104,22 +113,27 @@ public final void sync() {
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
// TODO these operations not thread-safe and when executed here, the iter may moved
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
if (multiNode) {
countDownLatch.countDown();
}
}
});
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}
if (multiNode) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();
executorService.shutdownNow();
}

syncing = false;
}
Expand Down
Loading