diff --git a/docs/changelog/120087.yaml b/docs/changelog/120087.yaml new file mode 100644 index 0000000000000..8539640809b04 --- /dev/null +++ b/docs/changelog/120087.yaml @@ -0,0 +1,5 @@ +pr: 120087 +summary: Include `clusterApplyListener` in long cluster apply warnings +area: Cluster Coordination +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index c34d0d19988c8..05d4b29f8f28f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -158,6 +158,31 @@ public void run() { } } + private record TimedListener(ActionListener listener, Recorder recorder) implements ActionListener { + + @Override + public void onResponse(Void response) { + try (Releasable ignored = recorder.record("listener.onResponse")) { + listener.onResponse(null); + } catch (Exception e) { + assert false : e; + logger.error("exception thrown by listener.onResponse", e); + } + } + + @Override + public void onFailure(Exception e) { + assert e != null; + try (Releasable ignored = recorder.record("listener.onFailure")) { + listener.onFailure(e); + } catch (Exception inner) { + e.addSuppressed(inner); + assert false : e; + logger.error(() -> "exception thrown by listener.onFailure", e); + } + } + } + @Override protected synchronized void doStop() { for (Map.Entry onGoingTimeout : timeoutClusterStateListeners.entrySet()) { @@ -394,12 +419,14 @@ private void runTask(String source, Function updateF final long startTimeMillis = threadPool.relativeTimeInMillis(); final Recorder stopWatch = new Recorder(threadPool, slowTaskThreadDumpTimeout); + final TimedListener timedListener = new TimedListener(clusterApplyListener, stopWatch); final ClusterState newClusterState; try { try (Releasable ignored = stopWatch.record("running task [" + source + ']')) { newClusterState = updateFunction.apply(previousClusterState); } } catch (Exception e) { + timedListener.onFailure(e); TimeValue executionTime = getTimeSince(startTimeMillis); logger.trace( () -> format( @@ -412,15 +439,14 @@ private void runTask(String source, Function updateF e ); warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch); - clusterApplyListener.onFailure(e); return; } if (previousClusterState == newClusterState) { + timedListener.onResponse(null); TimeValue executionTime = getTimeSince(startTimeMillis); logger.debug("processing [{}]: took [{}] no change in cluster state", source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch); - clusterApplyListener.onResponse(null); } else { if (logger.isTraceEnabled()) { logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), source, newClusterState); @@ -430,6 +456,7 @@ private void runTask(String source, Function updateF try { setIsApplyingClusterState(); applyChanges(previousClusterState, newClusterState, source, stopWatch); + timedListener.onResponse(null); TimeValue executionTime = getTimeSince(startTimeMillis); logger.debug( "processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", @@ -439,8 +466,11 @@ private void runTask(String source, Function updateF newClusterState.stateUUID() ); warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch); - clusterApplyListener.onResponse(null); } catch (Exception e) { + // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we + // continue we will retry with the same cluster state but that might not help. + assert applicationMayFail(); + timedListener.onFailure(e); TimeValue executionTime = getTimeSince(startTimeMillis); if (logger.isTraceEnabled()) { logger.warn(() -> format(""" @@ -460,10 +490,6 @@ private void runTask(String source, Function updateF e ); } - // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we - // continue we will retry with the same cluster state but that might not help. - assert applicationMayFail(); - clusterApplyListener.onFailure(e); } finally { clearIsApplyingClusterState(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 7c1c954e7b4e9..e6f50ef42365e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -66,6 +66,12 @@ public long relativeTimeInMillis() { assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)); return currentTimeMillis; } + + @Override + public long rawRelativeTimeInMillis() { + assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)); + return currentTimeMillis; + } }; clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); allowClusterStateApplicationFailure = false; @@ -207,15 +213,33 @@ public void testLongClusterStateUpdateLogging() throws Exception { ); mockLog.addExpectation( new MockLog.SeenEventExpectation( - "test4", + "test3", ClusterApplierService.class.getCanonicalName(), Level.WARN, "*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: " + "[running task [test3]] took [*" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "test4", + ClusterApplierService.class.getCanonicalName(), + Level.WARN, + "*cluster state applier task [test4] took [36s] which is above the warn threshold of [*]: " + + "[running task [test4]] took [*" + ) + ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "test5", + ClusterApplierService.class.getCanonicalName(), + Level.WARN, + "*cluster state applier task [test5] took [38s] which is above the warn threshold of [*]: " + + "[running task [test5]] took [*" + ) + ); - final CountDownLatch latch = new CountDownLatch(4); + final CountDownLatch latch = new CountDownLatch(6); final CountDownLatch processedFirstTask = new CountDownLatch(1); currentTimeMillis = randomLongBetween(0L, Long.MAX_VALUE / 2); clusterApplierService.runOnApplierThread( @@ -266,9 +290,39 @@ public void onFailure(Exception e) { } } ); + clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> { + // do nothing (testing that onResponse is included in timing) + }, new ActionListener<>() { + + @Override + public void onResponse(Void unused) { + advanceTime(TimeValue.timeValueSeconds(36).millis()); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + clusterApplierService.runOnApplierThread("test5", Priority.HIGH, currentState -> { + throw new IllegalArgumentException("Testing that onFailure is included in timing"); + }, new ActionListener<>() { + + @Override + public void onResponse(Void unused) { + fail(); + } + + @Override + public void onFailure(Exception e) { + advanceTime(TimeValue.timeValueSeconds(38).millis()); + latch.countDown(); + } + }); // Additional update task to make sure all previous logging made it to the loggerName // We don't check logging for this on since there is no guarantee that it will occur before our check - clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> {}, new ActionListener<>() { + clusterApplierService.runOnApplierThread("test6", Priority.HIGH, currentState -> {}, new ActionListener<>() { @Override public void onResponse(Void ignored) { latch.countDown();