Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,29 @@ public final class HddsConfigKeys {
public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
"hdds.scm.safemode.pipeline-availability.check";
public static final boolean
HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;

public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
"hdds.scm.safemode.pipeline.creation";
public static final boolean
HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;

// % of containers which should have at least one reported replica
// before SCM comes out of safe mode.
public static final String HDDS_SCM_SAFEMODE_THRESHOLD_PCT =
"hdds.scm.safemode.threshold.pct";
public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;


// percentage of healthy pipelines, where all 3 datanodes are reported in the
// pipeline.
public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
"hdds.scm.safemode.healthy.pipelie.pct";
public static final double
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
// number of healthy RATIS pipeline(ONE or THREE factor)
public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
"hdds.scm.safemode.min.pipeline";
public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;

public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
"hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public boolean isOpen() {
return state == PipelineState.OPEN;
}

public boolean isAllocationTimeout() {
//TODO: define a system property to control the timeout value
return false;
}

public void setNodesInOrder(List<DatanodeDetails> nodes) {
nodesInOrder.set(nodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -72,9 +73,9 @@ public void schedule(CheckedRunnable runnable, long delay,
}, delay, timeUnit);
}

public void scheduleWithFixedDelay(Runnable runnable, long initialDelay,
long fixedDelay, TimeUnit timeUnit) {
scheduler
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
long initialDelay, long fixedDelay, TimeUnit timeUnit) {
return scheduler
.scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
}

Expand Down
30 changes: 20 additions & 10 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,6 @@
datanode periodically send container report to SCM. Unit could be
defined with postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
<name>hdds.command.status.report.interval</name>
<value>60000ms</value>
<tag>OZONE, CONTAINER, MANAGEMENT</tag>
<description>Time interval of the datanode to send status of command
execution. Each datanode periodically the execution status of commands
received from SCM to SCM. Unit could be defined with postfix
(ns,ms,s,m,h,d)</description>
</property>
<property>
<name>hdds.pipeline.report.interval</name>
<value>60000ms</value>
Expand Down Expand Up @@ -1300,7 +1291,7 @@

<property>
<name>hdds.scm.safemode.pipeline-availability.check</name>
<value>false</value>
<value>true</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Boolean value to enable pipeline availability check during SCM safe mode.
Expand Down Expand Up @@ -1385,6 +1376,25 @@
</description>
</property>

<property>
<name>hdds.scm.safemode.pipeline.creation</name>
<value>true</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Boolean value to enable background pipeline creation in SCM safe mode.
</description>
</property>

<property>
<name>hdds.scm.safemode.min.pipeline</name>
<value>1</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
"hdds.scm.safemode.pipeline.creation" is True.
</description>
</property>

<property>
<name>hdds.lock.max.concurrency</name>
<value>100</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CreatePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
Expand Down Expand Up @@ -126,6 +130,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
conf))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler())
.addHandler(new ClosePipelineCommandHandler())
.addHandler(new CreatePipelineCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handler for close container command received from SCM.
Expand All @@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CloseContainerCommandHandler.class);

private int invocationCount;
private AtomicLong invocationCount = new AtomicLong(0);
private long totalTime;

/**
Expand All @@ -69,7 +70,7 @@ public CloseContainerCommandHandler() {
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
invocationCount++;
invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
final DatanodeDetails datanodeDetails = context.getParent()
.getDatanodeDetails();
Expand Down Expand Up @@ -161,7 +162,7 @@ public SCMCommandProto.Type getCommandType() {
*/
@Override
public int getInvocationCount() {
return invocationCount;
return (int)invocationCount.get();
}

/**
Expand All @@ -171,8 +172,8 @@ public int getInvocationCount() {
*/
@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handler for close pipeline command received from SCM.
*/
public class ClosePipelineCommandHandler implements CommandHandler {

private static final Logger LOG =
LoggerFactory.getLogger(ClosePipelineCommandHandler.class);

private AtomicLong invocationCount = new AtomicLong(0);
private long totalTime;

/**
* Constructs a closePipelineCommand handler.
*/
public ClosePipelineCommandHandler() {
}

/**
* Handles a given SCM command.
*
* @param command - SCM Command
* @param ozoneContainer - Ozone Container.
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
final DatanodeDetails dn = context.getParent().getDatanodeDetails();
final ClosePipelineCommandProto closeCommand =
((ClosePipelineCommand)command).getProto();
final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();

try {
XceiverServerSpi server = ozoneContainer.getWriteChannel();
server.removeGroup(pipelineID);
LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
dn.getUuidString());
} catch (IOException e) {
LOG.error("Can't close pipeline #{}", pipelineID, e);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
}

/**
* Returns the command type that this command handler handles.
*
* @return Type
*/
@Override
public SCMCommandProto.Type getCommandType() {
return SCMCommandProto.Type.closePipelineCommand;
}

/**
* Returns number of times this handler has been invoked.
*
* @return int
*/
@Override
public int getInvocationCount() {
return (int)invocationCount.get();
}

/**
* Returns the average time this function takes to run.
*
* @return long
*/
@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void handle(SCMCommand command, OzoneContainer container,
default void updateCommandStatus(StateContext context, SCMCommand command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.debug("{} with Id:{} not found.", command.getType(),
log.warn("{} with Id:{} not found.", command.getType(),
command.getId());
}
}
Expand Down
Loading