Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hadoop.ozone.upgrade;

import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -39,6 +43,8 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
protected TreeMap<Integer, T> features = new TreeMap<>();
protected Map<String, T> featureMap = new HashMap<>();
protected volatile boolean isInitialized = false;
protected volatile UpgradeFinalizer.Status currentUpgradeState =
FINALIZATION_REQUIRED;

protected void init(int version, T[] lfs) throws IOException {

Expand All @@ -52,10 +58,16 @@ protected void init(int version, T[] lfs) throws IOException {
String.format("Cannot initialize VersionManager. Metadata " +
"layout version (%d) > software layout version (%d)",
metadataLayoutVersion, softwareLayoutVersion));
} else if (metadataLayoutVersion == softwareLayoutVersion) {
currentUpgradeState = ALREADY_FINALIZED;
}
}
}

public UpgradeFinalizer.Status getUpgradeState() {
return currentUpgradeState;
}

private void initializeFeatures(T[] lfs) {
Arrays.stream(lfs).forEach(f -> {
Preconditions.checkArgument(!featureMap.containsKey(f.name()));
Expand All @@ -71,6 +83,7 @@ protected void reset() {
featureMap.clear();
features.clear();
isInitialized = false;
currentUpgradeState = ALREADY_FINALIZED;
}

public void finalized(T layoutFeature) {
Expand All @@ -92,6 +105,10 @@ public void finalized(T layoutFeature) {
}
}

public void completeFinalization() {
currentUpgradeState = FINALIZATION_DONE;
}

private boolean softwareIsBehindMetaData() {
return metadataLayoutVersion > softwareLayoutVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,12 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.FinalizeNewLayoutVersionCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,16 +57,9 @@ public FinalizeNewLayoutVersionCommandHandler() {
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing FinalizeNewLayoutVersionCommandHandler command.");
LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
final DatanodeDetails datanodeDetails = context.getParent()
.getDatanodeDetails();
final FinalizeNewLayoutVersionCommandProto finalizeCommand =
((FinalizeNewLayoutVersionCommand)command).getProto();
final ContainerController controller = ozoneContainer.getController();
final boolean finalizeUpgrade =
finalizeCommand.getFinalizeNewLayoutVersion();
try {
// TODO : finalization logic
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public FinalizeNewLayoutVersionCommand(boolean finalizeNewLayoutVersion,
this.layoutInfo = layoutInfo;
}

public FinalizeNewLayoutVersionCommand(boolean finalizeNewLayoutVersion,
LayoutVersionProto layoutInfo) {
super();
finalizeUpgrade = finalizeNewLayoutVersion;
this.layoutInfo = layoutInfo;
}

/**
* Returns the type of this command.
*
Expand Down
2 changes: 2 additions & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ enum NodeState {
DEAD = 3;
DECOMMISSIONING = 4;
DECOMMISSIONED = 5;
HEALTHY_READ_ONLY = 6;

}

enum QueryScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
Expand Down Expand Up @@ -184,6 +185,16 @@ Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
void processNodeReport(DatanodeDetails datanodeDetails,
NodeReportProto nodeReport);

/**
* Process Node LayoutVersion report.
*
* @param datanodeDetails
* @param layoutReport
*/
void processLayoutVersionReport(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutReport);


/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
* @param dnID - Datanode uuid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand All @@ -65,6 +66,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ReflectionUtils;
Expand Down Expand Up @@ -92,7 +94,7 @@
*/
public class SCMNodeManager implements NodeManager {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);

private final NodeStateManager nodeStateManager;
Expand All @@ -110,6 +112,7 @@ public class SCMNodeManager implements NodeManager {
private final int numPipelinesPerMetadataVolume;
private final int heavyNodeCriteria;
private final HDDSLayoutVersionManager scmLayoutVersionManager;
private final EventPublisher scmNodeEventPublisher;

/**
* Constructs SCM machine Manager.
Expand All @@ -119,6 +122,7 @@ public SCMNodeManager(OzoneConfiguration conf,
EventPublisher eventPublisher,
NetworkTopology networkTopology,
HDDSLayoutVersionManager layoutVersionManager) {
this.scmNodeEventPublisher = eventPublisher;
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
this.version = VersionInfo.getLatestVersion();
this.commandQueue = new CommandQueue();
Expand Down Expand Up @@ -400,6 +404,64 @@ public void processNodeReport(DatanodeDetails datanodeDetails,
}
}

/**
* Process Layout Version report.
*
* @param datanodeDetails
* @param layoutVersionReport
*/
@Override
public void processLayoutVersionReport(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutVersionReport) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing Layout Version report from [datanode={}]",
datanodeDetails.getHostName());
}
if (LOG.isTraceEnabled()) {
LOG.trace("HB is received from [datanode={}]: <json>{}</json>",
datanodeDetails.getHostName(),
layoutVersionReport.toString().replaceAll("\n", "\\\\n"));
}

if (layoutVersionReport != null) {
int scmSlv = scmLayoutVersionManager.getSoftwareLayoutVersion();
int scmMlv = scmLayoutVersionManager.getMetadataLayoutVersion();
int dnSlv = layoutVersionReport.getSoftwareLayoutVersion();
int dnMlv = layoutVersionReport.getMetadataLayoutVersion();

// If the data node slv is > scm slv => log error condition
if (dnSlv > scmSlv) {
LOG.error("Rogue data node in the cluster : {}. " +
"DataNode SoftwareLayoutVersion = {}, SCM " +
"SoftwareLayoutVersion = {}",
datanodeDetails.getHostName(), dnSlv, scmSlv);
}

// If the datanode slv < scm slv, it can not be allowed to be part of
// any pipeline. However it can be allowed to join the cluster
if (dnMlv < scmMlv) {
LOG.warn("Data node {} can not be used in any pipeline in the " +
"cluster. " + "DataNode MetadataLayoutVersion = {}, SCM " +
"MetadataLayoutVersion = {}",
datanodeDetails.getHostName(), dnMlv, scmMlv);

// TBD: Add NEED_UPGRADE state and fill out state transitions
// around this state. Fire event to move this data node to
// NEED_UPGRADE state. The DataNode will be considered HEALTHY in
// this state but it can not be made part of any Pipeline.

// Also send Finalize command to the data node. Its OK to
// send Finalize command multiple times.
scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanodeDetails.getUuid(),
new FinalizeNewLayoutVersionCommand(true,
LayoutVersionProto.newBuilder()
.setSoftwareLayoutVersion(dnSlv)
.setMetadataLayoutVersion(dnSlv).build())));
}
}
}

/**
* Returns the aggregated node stats.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
Expand Down Expand Up @@ -101,6 +102,11 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
commands = nodeManager.getCommandQueue(dnID);

} else {
if (heartbeat.hasDataNodeLayoutVersion()) {
LOG.debug("Processing DataNode Layout Report.");
nodeManager.processLayoutVersionReport(datanodeDetails,
heartbeat.getDataNodeLayoutVersion());
}

// should we dispatch heartbeat through eventPublisher?
commands = nodeManager.processHeartbeat(datanodeDetails);
Expand Down Expand Up @@ -214,6 +220,18 @@ public NodeReportFromDatanode(DatanodeDetails datanodeDetails,
}
}

/**
* Layout report event payload with origin.
*/
public static class LayoutReportFromDatanode
extends ReportFromDatanode<LayoutVersionProto> {

public LayoutReportFromDatanode(DatanodeDetails datanodeDetails,
LayoutVersionProto report) {
super(datanodeDetails, report);
}
}

/**
* Container report event payload with origin.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,18 @@ public void processNodeReport(DatanodeDetails dnUuid,
// do nothing
}

/**
* Empty implementation for processLayoutVersionReport.
*
* @param dnUuid
* @param layoutReport
*/
@Override
public void processLayoutVersionReport(DatanodeDetails dnUuid,
LayoutVersionProto layoutReport) {
// do nothing
}

/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
Expand Down
Loading