Skip to content

Commit 4428ede

Browse files
original-brownbearAdam Locke
authored andcommitted
Limit CS Update Task Description Size (elastic#79443)
When working with very large clusters, there's various avenues to create very large batches of tasks that can render as strings with O(MB) length. Since we only use these strings for logging and there is limited value in knowing the exact task descriptions of large numbers of tasks it seems reasonable to put a hard limit on the logging here to prevent hard to work with logs and save some memory in extreme cases.
1 parent d1013ac commit 4428ede

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88
package org.elasticsearch.cluster;
99

10+
import org.elasticsearch.common.Strings;
1011
import org.elasticsearch.core.Nullable;
1112

1213
import java.util.IdentityHashMap;
@@ -46,7 +47,12 @@ default void clusterStatePublished(ClusterStatePublicationEvent clusterStatePubl
4647
* This allows groupd task description but the submitting source.
4748
*/
4849
default String describeTasks(List<T> tasks) {
49-
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
50+
final StringBuilder output = new StringBuilder();
51+
Strings.collectionToDelimitedStringWithLimit(
52+
(Iterable<String>) () -> tasks.stream().map(Object::toString).filter(s -> s.isEmpty() == false).iterator(),
53+
", ", "", "", 1024, output
54+
);
55+
return output.toString();
5056
}
5157

5258
/**

server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.cluster.service;
1010

1111
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.core.Nullable;
1314
import org.elasticsearch.common.Priority;
1415
import org.elasticsearch.core.TimeValue;
@@ -131,14 +132,28 @@ void runIfNotProcessed(BatchedTask updateTask) {
131132
}
132133

133134
if (toExecute.isEmpty() == false) {
134-
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
135+
run(updateTask.batchingKey, toExecute, buildTasksDescription(updateTask, toExecute, processTasksBySource));
136+
}
137+
}
138+
}
139+
140+
private static final int MAX_TASK_DESCRIPTION_CHARS = 8 * 1024;
141+
142+
private String buildTasksDescription(BatchedTask updateTask,
143+
List<BatchedTask> toExecute,
144+
Map<String, List<BatchedTask>> processTasksBySource) {
145+
final StringBuilder output = new StringBuilder();
146+
Strings.collectionToDelimitedStringWithLimit(
147+
(Iterable<String>) () -> processTasksBySource.entrySet().stream().map(entry -> {
135148
String tasks = updateTask.describeTasks(entry.getValue());
136149
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
137-
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
138-
139-
run(updateTask.batchingKey, toExecute, tasksSummary);
140-
}
150+
}).filter(s -> s.isEmpty() == false).iterator(),
151+
", ", "", "", MAX_TASK_DESCRIPTION_CHARS, output
152+
);
153+
if (output.length() > MAX_TASK_DESCRIPTION_CHARS) {
154+
output.append(" (").append(toExecute.size()).append(" tasks in total)");
141155
}
156+
return output.toString();
142157
}
143158

144159
/**

0 commit comments

Comments
 (0)