Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ private ContainerCommandResponseProto dispatchRequest(
// Once container is marked unhealthy, all the subsequent write
// transactions will fail with UNHEALTHY_CONTAINER exception.

if (container == null) {
throw new NullPointerException(
"Error on creating containers " + result + " " + responseProto
.getMessage());
}
// For container to be moved to unhealthy state here, the container can
// only be in open or closing state.
State containerState = container.getContainerData().getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@

package org.apache.hadoop.ozone.container.common.interfaces;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.Consumer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
Expand All @@ -54,41 +48,46 @@ public abstract class Handler {
protected final VolumeSet volumeSet;
protected String scmID;
protected final ContainerMetrics metrics;
protected String datanodeId;
private Consumer<ContainerReplicaProto> icrSender;

private final StateContext context;
private final DatanodeDetails datanodeDetails;

protected Handler(Configuration config, StateContext context,
protected Handler(Configuration config, String datanodeId,
ContainerSet contSet, VolumeSet volumeSet,
ContainerMetrics containerMetrics) {
ContainerMetrics containerMetrics,
Consumer<ContainerReplicaProto> icrSender) {
this.conf = config;
this.context = context;
this.containerSet = contSet;
this.volumeSet = volumeSet;
this.metrics = containerMetrics;
this.datanodeDetails = context.getParent().getDatanodeDetails();
this.datanodeId = datanodeId;
this.icrSender = icrSender;
}

public static Handler getHandlerForContainerType(
final ContainerType containerType, final Configuration config,
final StateContext context, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics) {
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
Consumer<ContainerReplicaProto> icrSender) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config, context, contSet, volumeSet, metrics);
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
icrSender);
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
containerType + "doesn't exist.");
}
}

/**
* Returns the Id of this datanode.
*
* @return datanode Id
*/
protected DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
protected String getDatanodeId() {
return datanodeId;
}

/**
* This should be called whenever there is state change. It will trigger
* an ICR to SCM.
Expand All @@ -97,12 +96,8 @@ protected DatanodeDetails getDatanodeDetails() {
*/
protected void sendICR(final Container container)
throws StorageContainerException {
IncrementalContainerReportProto icr = IncrementalContainerReportProto
.newBuilder()
.addReport(container.getContainerReport())
.build();
context.addReport(icr);
context.getParent().triggerHeartbeat();
ContainerReplicaProto containerReport = container.getContainerReport();
icrSender.accept(containerReport);
}

public abstract ContainerCommandResponseProto handle(
Expand Down Expand Up @@ -175,8 +170,9 @@ public abstract void closeContainer(Container container)
* Deletes the given container.
*
* @param container container to be deleted
* @param force if this is set to true, we delete container without checking
* state of the container.
* @param force if this is set to true, we delete container without
* checking
* state of the container.
* @throws IOException
*/
public abstract void deleteContainer(Container container, boolean force)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
Expand All @@ -60,7 +62,6 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
Expand Down Expand Up @@ -109,9 +110,10 @@ public class KeyValueHandler extends Handler {
private final AutoCloseableLock containerCreationLock;
private final boolean doSyncWrite;

public KeyValueHandler(Configuration config, StateContext context,
ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
super(config, context, contSet, volSet, metrics);
public KeyValueHandler(Configuration config, String datanodeId,
ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics,
Consumer<ContainerReplicaProto> icrSender) {
super(config, datanodeId, contSet, volSet, metrics, icrSender);
containerType = ContainerType.KeyValueContainer;
blockManager = new BlockManagerImpl(config);
doSyncWrite =
Expand Down Expand Up @@ -220,7 +222,7 @@ ContainerCommandResponseProto handleCreateContainer(

KeyValueContainerData newContainerData = new KeyValueContainerData(
containerID, maxContainerSize, request.getPipelineID(),
getDatanodeDetails().getUuidString());
getDatanodeId());
// TODO: Add support to add metadataList to ContainerData. Add metadata
// to container during creation.
KeyValueContainer newContainer = new KeyValueContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

package org.apache.hadoop.ozone.container.ozoneimpl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
Expand All @@ -42,24 +47,20 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;

import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication
.OnDemandContainerReplicationSource;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.ozone.OzoneConfigKeys.*;

/**
* Ozone main class sets up the network servers and initializes the container
* layer.
Expand Down Expand Up @@ -100,10 +101,22 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
buildContainerSet();
final ContainerMetrics metrics = ContainerMetrics.create(conf);
this.handlers = Maps.newHashMap();

Consumer<ContainerReplicaProto> icrSender = containerReplicaProto -> {
IncrementalContainerReportProto icr = IncrementalContainerReportProto
.newBuilder()
.addReport(containerReplicaProto)
.build();
context.addReport(icr);
context.getParent().triggerHeartbeat();
};

for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, context, containerSet, volumeSet, metrics));
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, icrSender));
}

SecurityConfig secConf = new SecurityConfig(conf);
Expand Down Expand Up @@ -169,7 +182,6 @@ private void buildContainerSet() {

}


/**
* Start background daemon thread for performing container integrity checks.
*/
Expand Down Expand Up @@ -240,13 +252,14 @@ public void stop() {
ContainerMetrics.remove();
}


@VisibleForTesting
public ContainerSet getContainerSet() {
return containerSet;
}

/**
* Returns container report.
*
* @return - container report.
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
Expand All @@ -48,6 +49,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.test.GenericTestUtils;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -57,6 +59,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
Expand All @@ -69,6 +72,9 @@
*/
public class TestHddsDispatcher {

public static final Consumer<ContainerReplicaProto> NO_OP_ICR_SENDER =
c -> {};

@Test
public void testContainerCloseActionWhenFull() throws IOException {
String testDir = GenericTestUtils.getTempPath(
Expand Down Expand Up @@ -98,8 +104,9 @@ public void testContainerCloseActionWhenFull() throws IOException {
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
Expand Down Expand Up @@ -214,8 +221,9 @@ private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
}

HddsDispatcher hddsDispatcher = new HddsDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
package org.apache.hadoop.ozone.container.common.interfaces;

import com.google.common.collect.Maps;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -69,7 +72,10 @@ public void setup() throws Exception {
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, context, containerSet, volumeSet, metrics));
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
TestHddsDispatcher.NO_OP_ICR_SENDER));
}
this.dispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, null, metrics, null);
Expand Down
Loading