Skip to content
36 changes: 25 additions & 11 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -80,22 +81,30 @@ public void close() {

@Override
public final void sync() {
if (pipelinedResponses.isEmpty()) {
return;
}

if (syncing) {
return;
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
boolean onlyOneNode = pipelinedResponses.size() == 1;
ExecutorService executorService = onlyOneNode ? null : Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
CountDownLatch countDownLatch = onlyOneNode ? null : new CountDownLatch(pipelinedResponses.size());

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {

// last node run in current thread directly
Executor executor = pipelinedResponsesIterator.hasNext() ? executorService : Runnable::run;
executor.execute(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
Expand All @@ -104,22 +113,27 @@ public final void sync() {
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
// TODO these operations not thread-safe
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
IOUtils.closeQuietly(connection);
if (!onlyOneNode) {
countDownLatch.countDown();
}
}
});
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}
if (!onlyOneNode) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();
executorService.shutdownNow();
}

syncing = false;
}
Expand Down