Skip to content

Commit 1b79b0e

Browse files
committed
Add more debugging information to rethrottles
I'm still trying to track down failures like: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+dockeralpine-periodic/1180/console It looks like a task is hanging but I'm not sure why. So this adds more logging for next time.
1 parent 305e555 commit 1b79b0e

File tree

5 files changed

+15
-21
lines changed

5 files changed

+15
-21
lines changed

core/src/main/java/org/elasticsearch/action/bulk/byscroll/WorkingBulkByScrollTask.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -211,16 +211,12 @@ private void setRequestsPerSecond(float requestsPerSecond) {
211211
@Override
212212
public void rethrottle(float newRequestsPerSecond) {
213213
synchronized (delayedPrepareBulkRequestReference) {
214-
if (logger.isDebugEnabled()) {
215-
logger.debug("[{}]: Rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
216-
}
214+
logger.debug("[{}]: rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
217215
setRequestsPerSecond(newRequestsPerSecond);
218216

219217
DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
220218
if (delayedPrepareBulkRequest == null) {
221-
if (logger.isDebugEnabled()) {
222-
logger.debug("[{}]: Skipping rescheduling because there is no scheduled task", getId());
223-
}
219+
logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId());
224220
// No request has been queued yet so nothing to reschedule.
225221
return;
226222
}
@@ -259,20 +255,16 @@ DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
259255
* The user is attempting to slow the request down. We'll let the change in throttle take effect the next time we delay
260256
* prepareBulkRequest. We can't just reschedule the request further out in the future the bulk context might time out.
261257
*/
262-
if (logger.isDebugEnabled()) {
263-
logger.debug("[{}]: Skipping rescheduling because the new throttle [{}] is slower than the old one [{}].", getId(),
264-
newRequestsPerSecond, requestsPerSecond);
265-
}
258+
logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(),
259+
newRequestsPerSecond, requestsPerSecond);
266260
return this;
267261
}
268262

269263
long remainingDelay = future.getDelay(TimeUnit.NANOSECONDS);
270264
// Actually reschedule the task
271265
if (false == FutureUtils.cancel(future)) {
272266
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
273-
if (logger.isDebugEnabled()) {
274-
logger.debug("[{}]: Skipping rescheduling we couldn't cancel the task.", getId());
275-
}
267+
logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", getId());
276268
return this;
277269
}
278270

@@ -281,9 +273,7 @@ DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
281273
* test it you'll find that requests sneak through. So each request is given a runOnce boolean to prevent that.
282274
*/
283275
TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
284-
if (logger.isDebugEnabled()) {
285-
logger.debug("[{}]: Rescheduling for [{}] in the future.", getId(), newDelay);
286-
}
276+
logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay);
287277
return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
288278
}
289279

core/src/test/java/org/elasticsearch/action/bulk/byscroll/WorkingBulkByScrollTaskTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void onFailure(Exception e) {
168168
}
169169
});
170170

171-
// Rethrottle on a random number of threads, on of which is this thread.
171+
// Rethrottle on a random number of threads, one of which is this thread.
172172
Runnable test = () -> {
173173
try {
174174
int rethrottles = 0;

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.reindex;
2121

22+
import org.apache.logging.log4j.Logger;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.FailedNodeException;
2425
import org.elasticsearch.action.TaskOperationFailure;
@@ -54,21 +55,22 @@ public TransportRethrottleAction(Settings settings, ThreadPool threadPool, Clust
5455

5556
@Override
5657
protected void taskOperation(RethrottleRequest request, BulkByScrollTask task, ActionListener<TaskInfo> listener) {
57-
rethrottle(clusterService.localNode().getId(), client, task, request.getRequestsPerSecond(), listener);
58+
rethrottle(logger, clusterService.localNode().getId(), client, task, request.getRequestsPerSecond(), listener);
5859
}
5960

60-
static void rethrottle(String localNodeId, Client client, BulkByScrollTask task, float newRequestsPerSecond,
61+
static void rethrottle(Logger logger, String localNodeId, Client client, BulkByScrollTask task, float newRequestsPerSecond,
6162
ActionListener<TaskInfo> listener) {
6263
int runningSubTasks = task.runningSliceSubTasks();
6364
if (runningSubTasks == 0) {
64-
// Nothing to do, all sub tasks are done
65+
logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond);
6566
task.rethrottle(newRequestsPerSecond);
6667
listener.onResponse(task.taskInfo(localNodeId, true));
6768
return;
6869
}
6970
RethrottleRequest subRequest = new RethrottleRequest();
7071
subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubTasks);
7172
subRequest.setParentTaskId(new TaskId(localNodeId, task.getId()));
73+
logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), subRequest.getRequestsPerSecond());
7274
client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap(r -> {
7375
r.rethrowFailures("Rethrottle");
7476
listener.onResponse(task.getInfoGivenSliceInfo(localNodeId, r.getTasks()));

modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.ingest.IngestTestPlugin;
3838
import org.elasticsearch.plugins.Plugin;
3939
import org.elasticsearch.tasks.TaskInfo;
40+
import org.elasticsearch.test.junit.annotations.TestLogging;
4041
import org.hamcrest.Matcher;
4142
import org.junit.Before;
4243

@@ -59,6 +60,7 @@
5960
* different cancellation places - that is the responsibility of AsyncBulkByScrollActionTests which have more precise control to
6061
* simulate failures but do not exercise important portion of the stack like transport and task management.
6162
*/
63+
@TestLogging("org.elasticsearch.action.bulk.byscroll:DEBUG,org.elasticsearch.index.reindex:DEBUG")
6264
public class CancelTests extends ReindexTestCase {
6365

6466
protected static final String INDEX = "reindex-cancel-index";

modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private void rethrottleTestCase(int runningSlices, Consumer<ActionListener<ListT
7373
@SuppressWarnings("unchecked")
7474
ActionListener<TaskInfo> listener = mock(ActionListener.class);
7575

76-
TransportRethrottleAction.rethrottle(localNodeId, client, task, newRequestsPerSecond, listener);
76+
TransportRethrottleAction.rethrottle(logger, localNodeId, client, task, newRequestsPerSecond, listener);
7777

7878
// Capture the sub request and the listener so we can verify they are sane
7979
ArgumentCaptor<RethrottleRequest> subRequest = ArgumentCaptor.forClass(RethrottleRequest.class);

0 commit comments

Comments
 (0)