Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void start() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf,
dnCertClient, secretKeyClient, this::terminateDatanode,
reconfigurationHandler);
try {
Expand Down Expand Up @@ -619,6 +619,10 @@ public void saveNewCertId(String newCertId) {
}
}

public boolean isStopped() {
return isStopped.get();
}

/**
* Check ozone admin privilege, throws exception if not admin.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
Expand Down Expand Up @@ -138,7 +139,9 @@ public class DatanodeStateMachine implements Closeable {
* @param certClient - Datanode Certificate client, required if security is
* enabled
*/
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
@SuppressWarnings("checkstyle:ParameterNumber")
public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
DatanodeDetails datanodeDetails,
ConfigurationSource conf,
CertificateClient certClient,
SecretKeyClient secretKeyClient,
Expand Down Expand Up @@ -178,7 +181,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
// HDDS-3116 for more details.
constructionLock.writeLock().lock();
try {
container = new OzoneContainer(this.datanodeDetails,
container = new OzoneContainer(hddsDatanodeService, this.datanodeDetails,
conf, context, certClient, secretKeyClient);
} finally {
constructionLock.writeLock().unlock();
Expand Down Expand Up @@ -274,7 +277,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
@VisibleForTesting
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
ConfigurationSource conf) throws IOException {
this(datanodeDetails, conf, null, null, null,
this(null, datanodeDetails, conf, null, null, null,
new ReconfigurationHandler("DN", (OzoneConfiguration) conf, op -> { }));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.Cache;
import org.apache.hadoop.hdds.utils.ResourceCache;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
Expand All @@ -70,6 +72,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
Expand All @@ -94,6 +97,7 @@
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;
Expand Down Expand Up @@ -198,19 +202,23 @@ long getStartTime() {

private final Semaphore applyTransactionSemaphore;
private final boolean waitOnBothFollowers;
private final HddsDatanodeService datanodeService;
private static Semaphore semaphore = new Semaphore(1);

/**
* CSM metrics.
*/
private final CSMMetrics metrics;

@SuppressWarnings("parameternumber")
public ContainerStateMachine(RaftGroupId gid,
public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupId gid,
ContainerDispatcher dispatcher,
ContainerController containerController,
List<ThreadPoolExecutor> chunkExecutors,
XceiverServerRatis ratisServer,
ConfigurationSource conf,
String threadNamePrefix) {
this.datanodeService = hddsDatanodeService;
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
Expand Down Expand Up @@ -877,6 +885,49 @@ public void notifyTermIndexUpdated(long term, long index) {
removeStateMachineDataIfNeeded(index);
}

@Override
public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allServer) {
// if datanodeService is stopped , it indicates this `close` originates
// from `HddsDatanodeService.stop()`, otherwise, it indicates this `close` originates from ratis.
if (allServer) {
if (datanodeService != null && !datanodeService.isStopped()) {
LOG.info("{} is closed by ratis", gid);
if (semaphore.tryAcquire()) {
// run with a different thread, so this raft group can be closed
Runnable runnable = () -> {
try {
int closed = 0, total = 0;
try {
Thread.sleep(5000); // sleep 5s
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Iterator<RaftGroupId> iterator = ratisServer.getServer().getGroupIds().iterator();
while (iterator.hasNext()) {
RaftGroupId id = iterator.next();
RaftServer.Division division = ratisServer.getServer().getDivision(id);
if (division.getRaftServer().getLifeCycleState() == LifeCycle.State.CLOSED) {
closed++;
}
total++;
}
LOG.error("Container statemachine is closed by ratis, terminating HddsDatanodeService. " +
"closed({})/total({})", closed, total);
datanodeService.terminateDatanode();
} catch (IOException e) {
LOG.warn("Failed to get division for raft groups", e);
LOG.error("Container statemachine is closed by ratis, terminating HddsDatanodeService");
datanodeService.terminateDatanode();
}
};
CompletableFuture.runAsync(runnable);
}
} else {
LOG.info("{} is closed by HddsDatanodeService", gid);
}
}
}

private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
ContainerCommandRequestProto request, DispatcherContext context,
Consumer<Throwable> exceptionHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
Expand Down Expand Up @@ -178,13 +179,15 @@ private static long nextCallId() {
private final boolean shouldDeleteRatisLogDirectory;
private final boolean streamEnable;
private final DatanodeRatisServerConfig ratisServerConfig;
private final HddsDatanodeService datanodeService;

private XceiverServerRatis(DatanodeDetails dd,
private XceiverServerRatis(HddsDatanodeService hddsDatanodeService, DatanodeDetails dd,
ContainerDispatcher dispatcher, ContainerController containerController,
StateContext context, ConfigurationSource conf, Parameters parameters)
throws IOException {
this.conf = conf;
Objects.requireNonNull(dd, "DatanodeDetails == null");
datanodeService = hddsDatanodeService;
datanodeDetails = dd;
ratisServerConfig = conf.getObject(DatanodeRatisServerConfig.class);
assignPorts();
Expand Down Expand Up @@ -242,7 +245,7 @@ private int determinePort(String key, int defaultValue) {
}

private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, containerController,
return new ContainerStateMachine(datanodeService, gid, dispatcher, containerController,
chunkExecutors, this, conf, datanodeDetails.threadNamePrefix());
}

Expand Down Expand Up @@ -522,14 +525,14 @@ private void setPendingRequestsLimits(RaftProperties properties) {
.valueOf(pendingRequestsMegaBytesLimit, TraditionalBinaryPrefix.MEGA));
}

public static XceiverServerRatis newXceiverServerRatis(
public static XceiverServerRatis newXceiverServerRatis(HddsDatanodeService hddsDatanodeService,
DatanodeDetails datanodeDetails, ConfigurationSource ozoneConf,
ContainerDispatcher dispatcher, ContainerController containerController,
CertificateClient caClient, StateContext context) throws IOException {
Parameters parameters = createTlsParameters(
new SecurityConfig(ozoneConf), caClient);

return new XceiverServerRatis(datanodeDetails, dispatcher,
return new XceiverServerRatis(hddsDatanodeService, datanodeDetails, dispatcher,
containerController, context, ozoneConf, parameters);
}

Expand Down Expand Up @@ -594,6 +597,7 @@ private int getRealPort(InetSocketAddress address, Port.Name name) {
public void stop() {
if (isStarted) {
try {
LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId());
// shutdown server before the executors as while shutting down,
// some of the tasks would be executed using the executors.
server.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
Expand Down Expand Up @@ -138,7 +139,7 @@ enum InitializingStatus {
* @throws DiskOutOfSpaceException
* @throws IOException
*/
public OzoneContainer(
public OzoneContainer(HddsDatanodeService hddsDatanodeService,
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context, CertificateClient certClient,
SecretKeyVerifierClient secretKeyClient) throws IOException {
Expand Down Expand Up @@ -205,7 +206,7 @@ public OzoneContainer(
*/
controller = new ContainerController(containerSet, handlers);

writeChannel = XceiverServerRatis.newXceiverServerRatis(
writeChannel = XceiverServerRatis.newXceiverServerRatis(hddsDatanodeService,
datanodeDetails, config, hddsDispatcher, controller, certClient,
context);

Expand Down Expand Up @@ -277,7 +278,7 @@ public OzoneContainer(
public OzoneContainer(
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context) throws IOException {
this(datanodeDetails, conf, context, null, null);
this(null, datanodeDetails, conf, context, null, null);
}

public GrpcTlsConfig getTlsClientConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public static XceiverServerRatis newXceiverServerRatis(
conf.setInt(OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());

return XceiverServerRatis.newXceiverServerRatis(dn, conf,
return XceiverServerRatis.newXceiverServerRatis(null, dn, conf,
getNoopContainerDispatcher(), getEmptyContainerController(),
null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ static XceiverServerRatis newXceiverServerRatis(
conf.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);

final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
return XceiverServerRatis.newXceiverServerRatis(null, dn, conf, dispatcher,
new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private OzoneContainer createAndStartOzoneContainerInstance() {
try {
StateContext stateContext = ContainerTestUtils.getMockContext(dn, conf);
container = new OzoneContainer(
dn, conf, stateContext, caClient, keyClient);
null, dn, conf, stateContext, caClient, keyClient);
MutableVolumeSet volumeSet = container.getVolumeSet();
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempFolder.toFile()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void testCreateOzoneContainer(boolean requireToken, boolean hasToken,
conf.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_IPC_RANDOM_PORT, false);

DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
container = new OzoneContainer(dn, conf, ContainerTestUtils
container = new OzoneContainer(null, dn, conf, ContainerTestUtils
.getMockContext(dn, conf), caClient, secretKeyClient);
MutableVolumeSet volumeSet = container.getVolumeSet();
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static XceiverServerRatis newXceiverServerRatis(
conf.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);

final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
return XceiverServerRatis.newXceiverServerRatis(null, dn, conf, dispatcher,
new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
caClient, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ XceiverServerRatis newXceiverServerRatis(
conf.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = createDispatcher(dn,
UUID.randomUUID(), conf);
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
return XceiverServerRatis.newXceiverServerRatis(null, dn, conf, dispatcher,
new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
caClient, null);
}
Expand Down