Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.Client;
Expand Down Expand Up @@ -425,62 +423,6 @@ public ContainerWithPipeline getContainerWithPipeline(long containerId)
return storageContainerLocationClient.getContainerWithPipeline(containerId);
}

/**
* Close a container.
*
* @param pipeline the container to be closed.
* @throws IOException
*/
@Override
public void closeContainer(long containerId, Pipeline pipeline)
throws IOException {
XceiverClientSpi client = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Close container {}", pipeline);
}
/*
TODO: two orders here, revisit this later:
1. close on SCM first, then on data node
2. close on data node first, then on SCM

with 1: if client failed after closing on SCM, then there is a
container SCM thinks as closed, but is actually open. Then SCM will no
longer allocate block to it, which is fine. But SCM may later try to
replicate this "closed" container, which I'm not sure is safe.

with 2: if client failed after close on datanode, then there is a
container SCM thinks as open, but is actually closed. Then SCM will still
try to allocate block to it. Which will fail when actually doing the
write. No more data can be written, but at least the correctness and
consistency of existing data will maintain.

For now, take the #2 way.
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);

storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);

ContainerProtocolCalls.closeContainer(client, containerId,
null);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.complete);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}

/**
* Close a container.
*
Expand All @@ -489,9 +431,10 @@ public void closeContainer(long containerId, Pipeline pipeline)
@Override
public void closeContainer(long containerId)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerId);
Pipeline pipeline = info.getPipeline();
closeContainer(containerId, pipeline);
if (LOG.isDebugEnabled()) {
LOG.debug("Close container {}", containerId);
}
storageContainerLocationClient.closeContainer(containerId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ public interface ScmClient extends Closeable {
ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException;

/**
* Close a container.
*
* @param containerId - ID of the container.
* @param pipeline - Pipeline where the container is located.
* @throws IOException
*/
void closeContainer(long containerId, Pipeline pipeline) throws IOException;

/**
* Close a container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -113,18 +111,12 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
HddsProtos.QueryScope queryScope, String poolName) throws IOException;

/**
* Notify from client when begin or finish creating objects like pipeline
* or containers on datanodes.
* Container will be in Operational state after that.
* @param type object type
* @param id object id
* @param op operation type (e.g., create, close, delete)
* @param stage creation stage
* Close a container.
*
* @param containerID ID of the container to close
* @throws IOException in case of any Exception
*/
void notifyObjectStageChange(
ObjectStageChangeRequestProto.Type type, long id,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException;
void closeContainer(long containerID) throws IOException;

/**
* Creates a replication pipeline of a specified type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
Expand Down Expand Up @@ -257,31 +257,22 @@ public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
}

/**
* Notify from client that creates object on datanodes.
* Close a container.
*
* @param type object type
* @param id object id
* @param op operation type (e.g., create, close, delete)
* @param stage object creation stage : begin/complete
* @param containerID ID of the container to close
* @throws IOException in case of any Exception
*/
@Override
public void notifyObjectStageChange(
ObjectStageChangeRequestProto.Type type, long id,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException {
Preconditions.checkState(id >= 0,
"Object id cannot be negative.");
ObjectStageChangeRequestProto request =
ObjectStageChangeRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setType(type)
.setId(id)
.setOp(op)
.setStage(stage)
.build();
submitRequest(Type.NotifyObjectStageChange,
builder -> builder.setObjectStageChangeRequest(request));

public void closeContainer(long containerID) throws IOException {
Preconditions.checkState(containerID >= 0,
"Container ID cannot be negative");
SCMCloseContainerRequestProto request = SCMCloseContainerRequestProto
.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setContainerID(containerID)
.build();
submitRequest(Type.CloseContainer,
builder -> builder.setScmCloseContainerRequest(request));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum SCMAction implements AuditAction {
CLOSE_PIPELINE,
ACTIVATE_PIPELINE,
DEACTIVATE_PIPELINE,
CLOSE_CONTAINER,
DELETE_CONTAINER,
IN_SAFE_MODE,
FORCE_EXIT_SAFE_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ message ScmContainerLocationRequest {
optional SCMListContainerRequestProto scmListContainerRequest = 9;
optional SCMDeleteContainerRequestProto scmDeleteContainerRequest = 10;
optional NodeQueryRequestProto nodeQueryRequest = 11;
optional ObjectStageChangeRequestProto objectStageChangeRequest = 12;
optional SCMCloseContainerRequestProto scmCloseContainerRequest = 12;
optional PipelineRequestProto pipelineRequest = 13;
optional ListPipelineRequestProto listPipelineRequest = 14;
optional ActivatePipelineRequestProto activatePipelineRequest = 15;
Expand Down Expand Up @@ -79,7 +79,7 @@ message ScmContainerLocationResponse {
optional SCMListContainerResponseProto scmListContainerResponse = 9;
optional SCMDeleteContainerResponseProto scmDeleteContainerResponse = 10;
optional NodeQueryResponseProto nodeQueryResponse = 11;
optional ObjectStageChangeResponseProto objectStageChangeResponse = 12;
optional SCMCloseContainerResponseProto scmCloseContainerResponse = 12;
optional PipelineResponseProto pipelineResponse = 13;
optional ListPipelineResponseProto listPipelineResponse = 14;
optional ActivatePipelineResponseProto activatePipelineResponse = 15;
Expand All @@ -106,7 +106,7 @@ enum Type {
ListContainer = 4;
DeleteContainer = 5;
QueryNode = 6;
NotifyObjectStageChange = 7;
CloseContainer = 7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should stop doing with after the first Beta. These things break backward compact. Not relevant for this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

AllocatePipeline = 8;
ListPipelines = 9;
ActivatePipeline = 10;
Expand Down Expand Up @@ -185,28 +185,12 @@ message SCMDeleteContainerResponseProto {
// Empty response
}

message ObjectStageChangeRequestProto {
enum Type {
container = 1;
pipeline = 2;
}
// delete/copy operation may be added later
enum Op {
create = 1;
close = 2;
}
enum Stage {
begin = 1;
complete = 2;
}
required int64 id = 1;
required Type type = 2;
required Op op= 3;
required Stage stage = 4;
optional string traceID = 5;
message SCMCloseContainerRequestProto {
required int64 containerID = 1;
optional string traceID = 2;
}

message ObjectStageChangeResponseProto {
message SCMCloseContainerResponseProto {
// Empty response
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
Expand Down Expand Up @@ -151,12 +151,12 @@ public ScmContainerLocationResponse processRequest(
.setStatus(Status.OK)
.setNodeQueryResponse(queryNode(request.getNodeQueryRequest()))
.build();
case NotifyObjectStageChange:
case CloseContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setObjectStageChangeResponse(notifyObjectStageChange(
request.getObjectStageChangeRequest()))
.setScmCloseContainerResponse(closeContainer(
request.getScmCloseContainerRequest()))
.build();
case ListPipelines:
return ScmContainerLocationResponse.newBuilder()
Expand Down Expand Up @@ -297,12 +297,11 @@ public NodeQueryResponseProto queryNode(

}

public ObjectStageChangeResponseProto notifyObjectStageChange(
ObjectStageChangeRequestProto request)
public SCMCloseContainerResponseProto closeContainer(
SCMCloseContainerRequestProto request)
throws IOException {
impl.notifyObjectStageChange(request.getType(), request.getId(),
request.getOp(), request.getStage());
return ObjectStageChangeResponseProto.newBuilder().build();
impl.closeContainer(request.getContainerID());
return SCMCloseContainerResponseProto.newBuilder().build();
}

public ListPipelineResponseProto listPipelines(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -366,33 +367,29 @@ public List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
}

@Override
public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Type type, long id,
StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op
op, StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Stage stage) throws IOException {

LOG.info("Object type {} id {} op {} new stage {}", type, id, op,
stage);
if (type == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Type.container) {
if (op == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Op.close) {
if (stage == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Stage.begin) {
scm.getContainerManager()
.updateContainerState(ContainerID.valueof(id),
HddsProtos.LifeCycleEvent.FINALIZE);
} else {
scm.getContainerManager()
.updateContainerState(ContainerID.valueof(id),
HddsProtos.LifeCycleEvent.CLOSE);
}
public void closeContainer(long containerID) throws IOException {
final String remoteUser = getRpcRemoteUsername();
final Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
auditMap.put("remoteUser", remoteUser);
try {
scm.checkAdminAccess(remoteUser);
final ContainerID cid = ContainerID.valueof(containerID);
final HddsProtos.LifeCycleState state = scm.getContainerManager()
.getContainer(cid).getState();
if (!state.equals(HddsProtos.LifeCycleState.OPEN)) {
throw new SCMException("Cannot close a " + state + " container.",
ResultCodes.UNEXPECTED_CONTAINER_STATE);
}
} // else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
// TODO: pipeline state update will be addressed in future patch.
// }

scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER,
ContainerID.valueof(containerID));
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.CLOSE_CONTAINER, auditMap));
} catch (Exception ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
SCMAction.CLOSE_CONTAINER, auditMap, ex));
throw ex;
}
}

@Override
Expand Down