Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
private HddsDatanodeHttpServer httpServer;
private boolean printBanner;
private String[] args;
private volatile AtomicBoolean isStopped = new AtomicBoolean(false);

public HddsDatanodeService(boolean printBanner, String[] args) {
this.printBanner = printBanner;
Expand Down Expand Up @@ -209,7 +211,7 @@ public void start() {
initializeCertificateClient(conf);
}
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
dnCertClient);
dnCertClient, this::terminateDatanode);
try {
httpServer = new HddsDatanodeHttpServer(conf);
httpServer.start();
Expand Down Expand Up @@ -421,29 +423,37 @@ public void join() {
}
}

public void terminateDatanode() {
stop();
terminate(1);
}


@Override
public void stop() {
if (plugins != null) {
for (ServicePlugin plugin : plugins) {
try {
plugin.stop();
LOG.info("Stopped plug-in {}", plugin);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
if (!isStopped.get()) {
isStopped.set(true);
if (plugins != null) {
for (ServicePlugin plugin : plugins) {
try {
plugin.stop();
LOG.info("Stopped plug-in {}", plugin);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
}
}
}
}
if (datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
if (httpServer != null) {
try {
httpServer.stop();
} catch (Exception e) {
LOG.error("Stopping HttpServer is failed.", e);
if (datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
if (httpServer != null) {
try {
httpServer.stop();
} catch (Exception e) {
LOG.error("Stopping HttpServer is failed.", e);
}
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone;

/**
* Interface which declares a method to stop HddsDatanodeService.
*/
public interface HddsDatanodeStopService {

void stopService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class DatanodeStateMachine implements Closeable {

private JvmPauseMonitor jvmPauseMonitor;
private CertificateClient dnCertClient;
private final HddsDatanodeStopService hddsDatanodeStopService;

/**
* Constructs a a datanode state machine.
Expand All @@ -93,7 +95,9 @@ public class DatanodeStateMachine implements Closeable {
* enabled
*/
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
Configuration conf, CertificateClient certClient) throws IOException {
Configuration conf, CertificateClient certClient,
HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
this.hddsDatanodeStopService = hddsDatanodeStopService;
this.conf = conf;
this.datanodeDetails = datanodeDetails;
executorService = HadoopExecutors.newCachedThreadPool(
Expand Down Expand Up @@ -195,6 +199,14 @@ private void start() throws IOException {
LOG.error("Unable to finish the execution.", e);
}
}

// If we have got some exception in stateMachine we set the state to
// shutdown to stop the stateMachine thread. Along with this we should
// also stop the datanode.
if (context.getShutdownOnError()) {
LOG.error("DatanodeStateMachine Shutdown due to an critical error");
hddsDatanodeStopService.stopService();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class StateContext {
private final Queue<ContainerAction> containerActions;
private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
private boolean shutdownOnError = false;

/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
Expand Down Expand Up @@ -152,6 +153,22 @@ public void setState(DatanodeStateMachine.DatanodeStates state) {
this.state = state;
}

/**
* Sets the shutdownOnError. This method needs to be called when we
* set DatanodeState to SHUTDOWN when executing a task of a DatanodeState.
* @param value
*/
private void setShutdownOnError(boolean value) {
this.shutdownOnError = value;
}

/**
* Get shutdownStateMachine.
* @return boolean
*/
public boolean getShutdownOnError() {
return shutdownOnError;
}
/**
* Adds the report to report queue.
*
Expand Down Expand Up @@ -367,6 +384,14 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
}
this.setState(newState);
}

if (this.state == DatanodeStateMachine.DatanodeStates.SHUTDOWN) {
LOG.error("Critical error occurred in StateMachine, setting " +
"shutDownMachine");
// When some exception occurred, set shutdownStateMachine to true, so
// that we can terminate the datanode.
setShutdownOnError(true);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ private void initializeVolumeSet() throws IOException {

checkAndSetClusterID(hddsVolume.getClusterID());

volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath());

Expand All @@ -177,6 +175,8 @@ private void initializeVolumeSet() throws IOException {
throw new IOException("Failed to create HDDS storage dir " +
hddsVolume.getHddsRootDir());
}
volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
} catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build();
Expand All @@ -185,12 +185,14 @@ private void initializeVolumeSet() throws IOException {
}
}

checkAllVolumes();

// First checking if we have any volumes, if all volumes are failed the
// volumeMap size will be zero, and we throw Exception.
if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage locations configured");
}

checkAllVolumes();

// Ensure volume threads are stopped and scm df is saved during shutdown.
shutdownHook = () -> {
saveVolumeSetUsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void tearDown() throws Exception {
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) {
new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
Expand Down Expand Up @@ -222,7 +222,7 @@ public void testDatanodeStateContext() throws IOException,
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);

try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null)) {
new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testDatanodeStateMachineWithIdWriteFail() throws Exception {
datanodeDetails.setPort(port);

try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null)) {
new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
Expand Down Expand Up @@ -406,7 +406,7 @@ public void testDatanodeStateMachineWithInvalidConfiguration()
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
getNewDatanodeDetails(), perTestConf, null)) {
getNewDatanodeDetails(), perTestConf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is;
import org.junit.After;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Rule;
Expand Down Expand Up @@ -125,14 +127,14 @@ HddsVolumeChecker getVolumeChecker(Configuration configuration)
}

/**
* Verify that initialization fails if all volumes are bad.
* Verify that all volumes are added to fail list if all volumes are bad.
*/
@Test
public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5;

conf = getConfWithDataNodeDirs(numVolumes);
thrown.expect(IOException.class);

final VolumeSet volumeSet = new VolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
Expand All @@ -141,6 +143,9 @@ HddsVolumeChecker getVolumeChecker(Configuration configuration)
return new DummyChecker(configuration, new Timer(), numVolumes);
}
};

assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is updated like this due to code reordering. As we are calling checkAllVolumes() when only volumeMap size is not zero.

assertEquals(volumeSet.getVolumesList().size(), 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public void testGetVersionTask() throws Exception {
@Test
public void testCheckVersionResponse() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
true);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
Expand Down Expand Up @@ -478,7 +482,7 @@ private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,

// Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), conf, null);
TestUtils.randomDatanodeDetails(), conf, null, null);
EndpointStateMachine rpcEndPoint =
createEndpoint(conf, scmAddress, rpcTimeout)) {
HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ public void testContainerRandomPort() throws IOException {
true);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null);
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null);
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null)
TestUtils.randomDatanodeDetails(), ozoneConf, null, null)
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
Expand All @@ -198,11 +198,11 @@ public void testContainerRandomPort() throws IOException {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null);
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null);
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null)
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
Expand Down