Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/140791.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 140791
summary: Fix readiness edge case on startup
area: Infra/Node Lifecycle
type: bug
issues:
- 136955
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,6 @@ tests:
- class: org.elasticsearch.xpack.inference.DefaultEndPointsIT
method: testMultipleInferencesTriggeringDownloadAndDeploy
issue: https://github.com/elastic/elasticsearch/issues/117208
- class: org.elasticsearch.readiness.ReadinessClusterIT
method: testReadinessDuringRestartsNormalOrder
issue: https://github.com/elastic/elasticsearch/issues/136955
- class: org.elasticsearch.smoketest.SmokeTestIngestWithAllDepsClientYamlTestSuiteIT
method: test {yaml=ingest/100_sampling_with_reroute/Test get sample with multiple reroutes}
issue: https://github.com/elastic/elasticsearch/issues/137457
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand All @@ -42,9 +43,11 @@
public class ReadinessService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(ReadinessService.class);

private final ClusterService clusterService;
private final Environment environment;
private final CheckedSupplier<ServerSocketChannel, IOException> socketChannelFactory;

private volatile ClusterState lastClusterState = null;
private volatile boolean active; // false;
private volatile ServerSocketChannel serverChannel;
// package private for testing
Expand All @@ -65,9 +68,9 @@ public ReadinessService(ClusterService clusterService, Environment environment)
CheckedSupplier<ServerSocketChannel, IOException> socketChannelFactory
) {
this.serverChannel = null;
this.clusterService = clusterService;
this.environment = environment;
this.socketChannelFactory = socketChannelFactory;
clusterService.addListener(this);
}

// package private for testing
Expand Down Expand Up @@ -154,8 +157,13 @@ ServerSocketChannel setupSocket() {

@Override
protected void doStart() {
// Mark the service as active, we'll start the listener when ES is ready
// Mark the service as active, we'll start the tcp listener when ES is ready
this.active = true;
if (clusterService.lifecycleState() == Lifecycle.State.STARTED) {
this.lastClusterState = clusterService.state();
checkReadyState(null, lastClusterState);
}
clusterService.addListener(this);
}

// package private for testing
Expand Down Expand Up @@ -225,9 +233,14 @@ protected void doClose() {}

@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState clusterState = event.state();
checkReadyState(lastClusterState, event.state());
this.lastClusterState = event.state();
}

private void checkReadyState(ClusterState previousState, ClusterState clusterState) {
Set<String> shutdownNodeIds = PluginShutdownService.shutdownNodes(clusterState);
boolean shuttingDown = shutdownNodeIds.contains(clusterState.nodes().getLocalNodeId());
String localNodeId = clusterState.nodes().getLocalNodeId();
boolean shuttingDown = localNodeId != null && shutdownNodeIds.contains(localNodeId);

if (shuttingDown) {
// only disable the probe and log if the probe is running
Expand All @@ -236,10 +249,10 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.info("marking node as not ready because it's shutting down");
}
} else {
boolean masterElected = getReadinessState(clusterState, event.previousState(), this::isMasterElected, "masterElected");
boolean masterElected = getReadinessState(clusterState, previousState, this::isMasterElected, "masterElected");
boolean fileSettingsApplied = getReadinessState(
clusterState,
event.previousState(),
previousState,
this::areFileSettingsApplied,
"fileSettingsApplied"
);
Expand All @@ -254,7 +267,7 @@ private boolean getReadinessState(
String description
) {
boolean newStateValue = accessor.apply(clusterState);
boolean oldStateValue = accessor.apply(previousState);
boolean oldStateValue = previousState != null && accessor.apply(previousState);
if (oldStateValue != newStateValue) {
logger.info("readiness change: {}={}", description, newStateValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -51,6 +51,9 @@

import static org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata.ErrorKind.TRANSIENT;
import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReadinessServiceTests extends ESTestCase implements ReadinessClientProbe {
private ClusterService clusterService;
Expand Down Expand Up @@ -101,12 +104,9 @@ public HttpStats stats() {
public void setUp() throws Exception {
super.setUp();
threadpool = new TestThreadPool("readiness_service_tests");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadpool,
null
);
clusterService = mock(ClusterService.class);
when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED);
when(clusterService.state()).thenReturn(emptyState());
env = newEnvironment(Settings.builder().put(ReadinessService.PORT.getKey(), 0).build());

httpTransport = new FakeHttpTransport();
Expand All @@ -115,6 +115,14 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
// make sure readiness service is shut down
if (readinessService.lifecycleState() == Lifecycle.State.STARTED) {
readinessService.stop();
}
if (readinessService.lifecycleState() == Lifecycle.State.STOPPED) {
readinessService.close();
}

super.tearDown();
threadpool.shutdownNow();
}
Expand Down Expand Up @@ -195,9 +203,6 @@ public void testTCPProbe() throws Exception {

// test that we cannot connect to the socket anymore
tcpReadinessProbeFalse(readinessService);

readinessService.stop();
readinessService.close();
}

public void testStatusChange() throws Exception {
Expand Down Expand Up @@ -280,9 +285,6 @@ public void testStatusChange() throws Exception {
}
assertFalse(readinessService.ready());
tcpReadinessProbeFalse(readinessService);

readinessService.stop();
readinessService.close();
}

public void testFileSettingsUpdateError() throws Exception {
Expand All @@ -302,9 +304,13 @@ public void testFileSettingsUpdateError() throws Exception {
ClusterChangedEvent event = new ClusterChangedEvent("test", state, ClusterState.EMPTY_STATE);
readinessService.clusterChanged(event);
assertTrue(readinessService.ready());
}

readinessService.stop();
readinessService.close();
public void testAlreadyReadyWhenStarted() throws Exception {
ClusterState readyState = ClusterState.builder(noFileSettingsState()).metadata(emptyReservedStateMetadata).build();
when(clusterService.state()).thenReturn(readyState);
readinessService.start();
assertTrue(readinessService.ready());
}

private ClusterState emptyState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.nio.channels.spi.SelectorProvider;
import java.util.Set;

import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MockReadinessService extends ReadinessService {
/**
* Marker plugin used by {@link MockNode} to enable {@link MockReadinessService}.
Expand Down Expand Up @@ -100,25 +104,11 @@ private static boolean socketIsOpen(ReadinessService readinessService) {
return mockedSocket != null && mockedSocket.isOpen();
}

public static void tcpReadinessProbeTrue(ReadinessService readinessService) throws InterruptedException {
for (int i = 1; i <= RETRIES; ++i) {
if (socketIsOpen(readinessService)) {
return;
}
Thread.sleep(RETRY_DELAY_IN_MILLIS * i);
}

throw new AssertionError("Readiness socket should be open");
public static void tcpReadinessProbeTrue(ReadinessService readinessService) throws Exception {
assertBusy(() -> assertTrue("Readiness socket should be open", socketIsOpen(readinessService)));
}

public static void tcpReadinessProbeFalse(ReadinessService readinessService) throws InterruptedException {
for (int i = 0; i < RETRIES; ++i) {
if (socketIsOpen(readinessService) == false) {
return;
}
Thread.sleep(RETRY_DELAY_IN_MILLIS * i);
}

throw new AssertionError("Readiness socket should be closed");
public static void tcpReadinessProbeFalse(ReadinessService readinessService) throws Exception {
assertBusy(() -> assertFalse("Readiness socket should be close", socketIsOpen(readinessService)));
}
}