-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Snapshot: Migrate TransportRequestHandler to TransportMasterNodeAction #27165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
1f6bf50
37b8050
82a98ef
97c4724
82aedcc
06292d1
ea7ec38
cb73eea
85da45d
57daa0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,14 +23,24 @@ | |
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.apache.logging.log4j.util.Supplier; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.ActionRequestValidationException; | ||
| import org.elasticsearch.action.ActionResponse; | ||
| import org.elasticsearch.action.support.ActionFilters; | ||
| import org.elasticsearch.action.support.master.MasterNodeRequest; | ||
| import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
| import org.elasticsearch.cluster.ClusterChangedEvent; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.ClusterStateApplier; | ||
| import org.elasticsearch.cluster.ClusterStateListener; | ||
| import org.elasticsearch.cluster.ClusterStateTaskConfig; | ||
| import org.elasticsearch.cluster.ClusterStateTaskExecutor; | ||
| import org.elasticsearch.cluster.ClusterStateTaskListener; | ||
| import org.elasticsearch.cluster.SnapshotsInProgress; | ||
| import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; | ||
| import org.elasticsearch.cluster.SnapshotsInProgress.State; | ||
| import org.elasticsearch.cluster.block.ClusterBlockException; | ||
| import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.Nullable; | ||
|
|
@@ -85,9 +95,11 @@ | |
| * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for | ||
| * starting and stopping shard level snapshots | ||
| */ | ||
| public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener { | ||
| public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { | ||
|
|
||
| public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; | ||
| public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; | ||
|
|
||
| public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; | ||
|
|
||
| private final ClusterService clusterService; | ||
|
|
||
|
|
@@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements | |
| private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap(); | ||
|
|
||
| private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); | ||
| private UpdateSnapshotStatusAction updateSnapshotStatusHandler; | ||
|
|
||
| @Inject | ||
| public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, | ||
| TransportService transportService, IndicesService indicesService) { | ||
| TransportService transportService, IndicesService indicesService, | ||
| ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { | ||
| super(settings); | ||
| this.indicesService = indicesService; | ||
| this.snapshotsService = snapshotsService; | ||
|
|
@@ -118,20 +132,27 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S | |
| this.threadPool = threadPool; | ||
| if (DiscoveryNode.isDataNode(settings)) { | ||
| // this is only useful on the nodes that can hold data | ||
| // addLowPriorityApplier to make sure that Repository will be created before snapshot | ||
| clusterService.addLowPriorityApplier(this); | ||
| clusterService.addListener(this); | ||
| } | ||
|
|
||
| // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. | ||
| this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, | ||
| transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); | ||
|
|
||
| if (DiscoveryNode.isMasterNode(settings)) { | ||
| // This needs to run only on nodes that can become masters | ||
| transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler()); | ||
| transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| protected void doStart() { | ||
|
|
||
| assert this.updateSnapshotStatusHandler != null; | ||
| assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; | ||
| if (DiscoveryNode.isMasterNode(settings)) { | ||
| assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -151,11 +172,11 @@ protected void doStop() { | |
|
|
||
| @Override | ||
| protected void doClose() { | ||
| clusterService.removeApplier(this); | ||
| clusterService.removeListener(this); | ||
| } | ||
|
|
||
| @Override | ||
| public void applyClusterState(ClusterChangedEvent event) { | ||
| public void clusterChanged(ClusterChangedEvent event) { | ||
| try { | ||
| SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); | ||
| SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); | ||
|
|
@@ -449,7 +470,7 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) { | |
| /** | ||
| * Internal request that is used to send changes in snapshot status to master | ||
| */ | ||
| public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { | ||
| public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> { | ||
| private Snapshot snapshot; | ||
| private ShardId shardId; | ||
| private ShardSnapshotStatus status; | ||
|
|
@@ -462,6 +483,13 @@ public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, | |
| this.snapshot = snapshot; | ||
| this.shardId = shardId; | ||
| this.status = status; | ||
| // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. | ||
| this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); | ||
| } | ||
|
|
||
| @Override | ||
| public ActionRequestValidationException validate() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -502,11 +530,16 @@ public String toString() { | |
| * Updates the shard status | ||
| */ | ||
| public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { | ||
| UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); | ||
| try { | ||
| transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); | ||
| if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { | ||
| UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); | ||
| transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); | ||
| } else { | ||
| UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); | ||
| transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); | ||
| } | ||
| } catch (Exception e) { | ||
| logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); | ||
| logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -515,15 +548,24 @@ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, S | |
| * | ||
| * @param request update shard status request | ||
| */ | ||
| private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { | ||
| private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) { | ||
| logger.trace("received updated snapshot restore state [{}]", request); | ||
| clusterService.submitStateUpdateTask( | ||
| "update snapshot state", | ||
| request, | ||
| ClusterStateTaskConfig.build(Priority.NORMAL), | ||
| snapshotStateExecutor, | ||
| (source, e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", | ||
| request.snapshot(), request.shardId(), request.status()), e)); | ||
| new ClusterStateTaskListener() { | ||
| @Override | ||
| public void onFailure(String source, Exception e) { | ||
| listener.onFailure(e); | ||
| } | ||
|
|
||
| @Override | ||
| public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
| listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> { | ||
|
|
@@ -578,13 +620,107 @@ public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest> execute(Cluster | |
| } | ||
| } | ||
|
|
||
| static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { | ||
|
|
||
| } | ||
|
|
||
| class UpdateSnapshotStatusAction extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> { | ||
| UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, | ||
| ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { | ||
| super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new); | ||
| } | ||
|
|
||
| @Override | ||
| protected String executor() { | ||
| return ThreadPool.Names.SAME; | ||
| } | ||
|
|
||
| @Override | ||
| protected UpdateIndexShardSnapshotStatusResponse newResponse() { | ||
| return new UpdateIndexShardSnapshotStatusResponse(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception { | ||
| innerUpdateSnapshotState(request, listener); | ||
| } | ||
|
|
||
| @Override | ||
| protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} | ||
| */ | ||
| static class UpdateSnapshotStatusRequestV6 extends TransportRequest { | ||
| private Snapshot snapshot; | ||
| private ShardId shardId; | ||
| private ShardSnapshotStatus status; | ||
|
|
||
| UpdateSnapshotStatusRequestV6() { | ||
|
|
||
| } | ||
|
|
||
| UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { | ||
| this.snapshot = snapshot; | ||
| this.shardId = shardId; | ||
| this.status = status; | ||
| } | ||
|
|
||
| @Override | ||
| public void readFrom(StreamInput in) throws IOException { | ||
| super.readFrom(in); | ||
| snapshot = new Snapshot(in); | ||
| shardId = ShardId.readShardId(in); | ||
| status = new ShardSnapshotStatus(in); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| super.writeTo(out); | ||
| snapshot.writeTo(out); | ||
| shardId.writeTo(out); | ||
| status.writeTo(out); | ||
| } | ||
|
|
||
| Snapshot snapshot() { | ||
| return snapshot; | ||
| } | ||
|
|
||
| ShardId shardId() { | ||
| return shardId; | ||
| } | ||
|
|
||
| ShardSnapshotStatus status() { | ||
| return status; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Transport request handler that is used to send changes in snapshot status to master | ||
| * A BWC version of {@link UpdateSnapshotStatusAction} | ||
| */ | ||
| class UpdateSnapshotStateRequestHandler implements TransportRequestHandler<UpdateIndexShardSnapshotStatusRequest> { | ||
| class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> { | ||
| @Override | ||
| public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception { | ||
| innerUpdateSnapshotState(request); | ||
| public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { | ||
| final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, masterNodeTimeout? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() { | ||
| @Override | ||
| public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| logger.warn("Failed to update snapshot status", e); | ||
| } | ||
| }); | ||
| channel.sendResponse(TransportResponse.Empty.INSTANCE); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.snapshots; | ||
|
|
||
| import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.unit.ByteSizeUnit; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.snapshots.mockstore.MockRepository; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.test.disruption.NetworkDisruption; | ||
| import org.elasticsearch.test.transport.MockTransportService; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.everyItem; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
|
|
||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) | ||
| public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase { | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return Arrays.asList(MockRepository.Plugin.class, MockTransportService.TestPlugin.class); | ||
| } | ||
|
|
||
| public void testRetryPostingSnapshotStatusMessages() throws Exception { | ||
| String masterNode = internalCluster().startMasterOnlyNode(); | ||
| String dataNode = internalCluster().startDataOnlyNode(); | ||
|
|
||
| logger.info("--> creating repository"); | ||
| assertAcked(client().admin().cluster().preparePutRepository("test-repo") | ||
| .setType("mock").setSettings(Settings.builder() | ||
| .put("location", randomRepoPath()) | ||
| .put("compress", randomBoolean()) | ||
| .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); | ||
|
|
||
| final int shards = between(1, 10); | ||
| assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0))); | ||
| ensureGreen(); | ||
| final int numDocs = scaledRandomIntBetween(50, 100); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| index("test-index", "doc", Integer.toString(i)); | ||
| } | ||
|
|
||
| logger.info("--> blocking repository"); | ||
| String blockedNode = blockNodeWithIndex("test-repo", "test-index"); | ||
| dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") | ||
| .setWaitForCompletion(false) | ||
| .setIndices("test-index") | ||
| .get(); | ||
| waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); | ||
|
|
||
| final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap") | ||
| .get().getSnapshots().get(0).snapshotId(); | ||
|
|
||
| logger.info("--> start disrupting cluster"); | ||
| final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode), | ||
| NetworkDisruption.NetworkDelay.random(random())); | ||
| internalCluster().setDisruptionScheme(networkDisruption); | ||
| networkDisruption.startDisrupting(); | ||
|
|
||
| logger.info("--> unblocking repository"); | ||
| unblockNode("test-repo", blockedNode); | ||
|
|
||
| // Retrieve snapshot status from the data node. | ||
| SnapshotShardsService snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, blockedNode); | ||
| assertBusy(() -> { | ||
| final Snapshot snapshot = new Snapshot("test-repo", snapshotId); | ||
| List<IndexShardSnapshotStatus.Stage> stages = snapshotShardsService.currentSnapshotShards(snapshot) | ||
| .values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList()); | ||
| assertThat(stages, hasSize(shards)); | ||
| assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE))); | ||
| }); | ||
|
|
||
| logger.info("--> stop disrupting cluster"); | ||
| networkDisruption.stopDisrupting(); | ||
| internalCluster().clearDisruptionScheme(true); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is usually done by calling |
||
|
|
||
| assertBusy(() -> { | ||
| GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() | ||
| .prepareGetSnapshots("test-repo") | ||
| .setSnapshots("test-snap").get(); | ||
| SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); | ||
| logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards()); | ||
| assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); | ||
| assertThat(snapshotInfo.successfulShards(), equalTo(shards)); | ||
| }, 10, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment here why this is needs to be a listener and not an applier and why it's ok to have this as
addListenerand notaddLast?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test
DedicatedClusterSnapshotRestoreIT#testSnapshotWithStuckNodewas failed with anApplierbut passed with aListener. However, I don't really know the root cause.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imotov, I think I have figured out the root cause. A Listener is called after the new state is set, while an Applier is called before that. In
SnapshotShardsService, we call#syncShardStatsOnNewMaster->#updateIndexShardSnapshotStatuswhich in turn uses a TransportMasterNodeAction. The TransportMasterNodeAction accesses the state of the cluster directly which may not be available yet if it's invoked from the Applier.This explains why
#testSnapshotWithStuckNodewas failed with an Applier but passed with a Listener.