Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120087.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120087
summary: Include `clusterApplyListener` in long cluster apply warnings
area: Cluster Coordination
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,31 @@ public void run() {
}
}

private record TimedListener(ActionListener<Void> listener, Recorder recorder) implements ActionListener<Void> {

@Override
public void onResponse(Void response) {
try (Releasable ignored = recorder.record("listener.onResponse")) {
listener.onResponse(null);
} catch (Exception e) {
assert false : e;
logger.error("exception thrown by listener.onResponse", e);
}
}

@Override
public void onFailure(Exception e) {
assert e != null;
try (Releasable ignored = recorder.record("listener.onFailure")) {
listener.onFailure(e);
} catch (Exception inner) {
e.addSuppressed(inner);
assert false : e;
logger.error(() -> "exception thrown by listener.onFailure", e);
}
}
}

@Override
protected synchronized void doStop() {
for (Map.Entry<TimeoutClusterStateListener, NotifyTimeout> onGoingTimeout : timeoutClusterStateListeners.entrySet()) {
Expand Down Expand Up @@ -394,12 +419,14 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF

final long startTimeMillis = threadPool.relativeTimeInMillis();
final Recorder stopWatch = new Recorder(threadPool, slowTaskThreadDumpTimeout);
final TimedListener timedListener = new TimedListener(clusterApplyListener, stopWatch);
final ClusterState newClusterState;
try {
try (Releasable ignored = stopWatch.record("running task [" + source + ']')) {
newClusterState = updateFunction.apply(previousClusterState);
}
} catch (Exception e) {
timedListener.onFailure(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the listener throws, the following code will not run. Probably shouldn't happen in practice. But maybe still worthwhile to wrap in try-finally?

I also wonder whether we should add more details in the log message for the time spent on applying the cluster and calling the listener?

Copy link
Contributor Author

@nicktindall nicktindall Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TimedListener will measure the time spent in the listener. e.g. from the tests

cluster state applier task [test4] took [36s] which is above the warn threshold of [30s]: [running task [test4]] took [0ms], [listener.onResponse] took [36000ms]

Is that what you meant by "time spent ... calling the listener"

I will look at using one of the ActionListener.... utils or base classes to make exception handling more robust.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Yang is right, we just need a try/finally here. TBH it's an error for this listener to throw, we should probably assert that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no real need to propagate the exception to the caller either, it just bubbles up to the unhandled exception handler which logs it and drops it. We may as well catch and log (and assert) in TimedListener.

Copy link
Contributor Author

@nicktindall nicktindall Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClusterApplyActionListener already wraps the provided listener to prevent onFailure exceptions from propagating, but I added similar logic to TimedListener#onFailure just in case that changes and we end up with a less safe listener being passed in.

TimeValue executionTime = getTimeSince(startTimeMillis);
logger.trace(
() -> format(
Expand All @@ -412,15 +439,14 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
e
);
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
clusterApplyListener.onFailure(e);
return;
}

if (previousClusterState == newClusterState) {
timedListener.onResponse(null);
TimeValue executionTime = getTimeSince(startTimeMillis);
logger.debug("processing [{}]: took [{}] no change in cluster state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
clusterApplyListener.onResponse(null);
} else {
if (logger.isTraceEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), source, newClusterState);
Expand All @@ -430,6 +456,7 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
try {
setIsApplyingClusterState();
applyChanges(previousClusterState, newClusterState, source, stopWatch);
timedListener.onResponse(null);
TimeValue executionTime = getTimeSince(startTimeMillis);
logger.debug(
"processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})",
Expand All @@ -439,8 +466,11 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
newClusterState.stateUUID()
);
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
clusterApplyListener.onResponse(null);
} catch (Exception e) {
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
// continue we will retry with the same cluster state but that might not help.
assert applicationMayFail();
timedListener.onFailure(e);
TimeValue executionTime = getTimeSince(startTimeMillis);
if (logger.isTraceEnabled()) {
logger.warn(() -> format("""
Expand All @@ -460,10 +490,6 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
e
);
}
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
// continue we will retry with the same cluster state but that might not help.
assert applicationMayFail();
clusterApplyListener.onFailure(e);
} finally {
clearIsApplyingClusterState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public long relativeTimeInMillis() {
assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
return currentTimeMillis;
}

@Override
public long rawRelativeTimeInMillis() {
assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
return currentTimeMillis;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we use rawRelativeTimeInMillis in the Recorder but the threshold is in the order of seconds, I wonder if we could use relativeTimeInMillis instead (to reduce the calls to System.nanoTime())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if the cluster is under heavy GC or the cachedTimer thread is experiencing starvation, the elapsed calcuation will be off if we use the cached timer. This can be detected by log message "timer thread slept ...". But it's a bit indirect. Not sure if that was the intention behind using rawRelativeTimeInMillis or may I am just overly trying to explain it. I noticed we use it in places like MasterService, In/OutBoundHandlers and HttpTransport etc. But not in places such as PersistedClusterStateService or FsHealthService. Not sure whether they are intentional or accidental either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cached timer only updates every 200ms or so, and although the total threshold is many seconds in length many of the individual steps we record should take much less than 200ms. It's not a huge deal to call System::nanoTime here.

};
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
allowClusterStateApplicationFailure = false;
Expand Down Expand Up @@ -207,15 +213,33 @@ public void testLongClusterStateUpdateLogging() throws Exception {
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"test4",
"test3",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: "
+ "[running task [test3]] took [*"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"test4",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"*cluster state applier task [test4] took [36s] which is above the warn threshold of [*]: "
+ "[running task [test4]] took [*"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"test5",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"*cluster state applier task [test5] took [38s] which is above the warn threshold of [*]: "
+ "[running task [test5]] took [*"
)
);

final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch latch = new CountDownLatch(6);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
currentTimeMillis = randomLongBetween(0L, Long.MAX_VALUE / 2);
clusterApplierService.runOnApplierThread(
Expand Down Expand Up @@ -266,9 +290,39 @@ public void onFailure(Exception e) {
}
}
);
clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> {
// do nothing (testing that onResponse is included in timing)
}, new ActionListener<>() {

@Override
public void onResponse(Void unused) {
advanceTime(TimeValue.timeValueSeconds(36).millis());
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
}
});
clusterApplierService.runOnApplierThread("test5", Priority.HIGH, currentState -> {
throw new IllegalArgumentException("Testing that onFailure is included in timing");
}, new ActionListener<>() {

@Override
public void onResponse(Void unused) {
fail();
}

@Override
public void onFailure(Exception e) {
advanceTime(TimeValue.timeValueSeconds(38).millis());
latch.countDown();
}
});
// Additional update task to make sure all previous logging made it to the loggerName
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> {}, new ActionListener<>() {
clusterApplierService.runOnApplierThread("test6", Priority.HIGH, currentState -> {}, new ActionListener<>() {
@Override
public void onResponse(Void ignored) {
latch.countDown();
Expand Down