diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 152267366d51..8c441d48ff4b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ipc.Client; @@ -425,62 +423,6 @@ public ContainerWithPipeline getContainerWithPipeline(long containerId) return storageContainerLocationClient.getContainerWithPipeline(containerId); } - /** - * Close a container. - * - * @param pipeline the container to be closed. - * @throws IOException - */ - @Override - public void closeContainer(long containerId, Pipeline pipeline) - throws IOException { - XceiverClientSpi client = null; - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Close container {}", pipeline); - } - /* - TODO: two orders here, revisit this later: - 1. close on SCM first, then on data node - 2. close on data node first, then on SCM - - with 1: if client failed after closing on SCM, then there is a - container SCM thinks as closed, but is actually open. Then SCM will no - longer allocate block to it, which is fine. But SCM may later try to - replicate this "closed" container, which I'm not sure is safe. - - with 2: if client failed after close on datanode, then there is a - container SCM thinks as open, but is actually closed. Then SCM will still - try to allocate block to it. Which will fail when actually doing the - write. No more data can be written, but at least the correctness and - consistency of existing data will maintain. - - For now, take the #2 way. - */ - // Actually close the container on Datanode - client = xceiverClientManager.acquireClient(pipeline); - - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.begin); - - ContainerProtocolCalls.closeContainer(client, containerId, - null); - // Notify SCM to close the container - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.complete); - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client, false); - } - } - } - /** * Close a container. * @@ -489,9 +431,10 @@ public void closeContainer(long containerId, Pipeline pipeline) @Override public void closeContainer(long containerId) throws IOException { - ContainerWithPipeline info = getContainerWithPipeline(containerId); - Pipeline pipeline = info.getPipeline(); - closeContainer(containerId, pipeline); + if (LOG.isDebugEnabled()) { + LOG.debug("Close container {}", containerId); + } + storageContainerLocationClient.closeContainer(containerId); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 226ceda9255a..8ef63230f4af 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -65,15 +65,6 @@ public interface ScmClient extends Closeable { ContainerWithPipeline getContainerWithPipeline(long containerId) throws IOException; - /** - * Close a container. - * - * @param containerId - ID of the container. - * @param pipeline - Pipeline where the container is located. - * @throws IOException - */ - void closeContainer(long containerId, Pipeline pipeline) throws IOException; - /** * Close a container. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 88db8205a408..12a2046c03ab 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; import java.io.Closeable; import java.io.IOException; @@ -113,18 +111,12 @@ List queryNode(HddsProtos.NodeState state, HddsProtos.QueryScope queryScope, String poolName) throws IOException; /** - * Notify from client when begin or finish creating objects like pipeline - * or containers on datanodes. - * Container will be in Operational state after that. - * @param type object type - * @param id object id - * @param op operation type (e.g., create, close, delete) - * @param stage creation stage + * Close a container. + * + * @param containerID ID of the container to close + * @throws IOException in case of any Exception */ - void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, long id, - ObjectStageChangeRequestProto.Op op, - ObjectStageChangeRequestProto.Stage stage) throws IOException; + void closeContainer(long containerID) throws IOException; /** * Creates a replication pipeline of a specified type. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 01db597dfae1..a264121ec52f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; @@ -257,31 +257,22 @@ public List queryNode(HddsProtos.NodeState } /** - * Notify from client that creates object on datanodes. + * Close a container. * - * @param type object type - * @param id object id - * @param op operation type (e.g., create, close, delete) - * @param stage object creation stage : begin/complete + * @param containerID ID of the container to close + * @throws IOException in case of any Exception */ @Override - public void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, long id, - ObjectStageChangeRequestProto.Op op, - ObjectStageChangeRequestProto.Stage stage) throws IOException { - Preconditions.checkState(id >= 0, - "Object id cannot be negative."); - ObjectStageChangeRequestProto request = - ObjectStageChangeRequestProto.newBuilder() - .setTraceID(TracingUtil.exportCurrentSpan()) - .setType(type) - .setId(id) - .setOp(op) - .setStage(stage) - .build(); - submitRequest(Type.NotifyObjectStageChange, - builder -> builder.setObjectStageChangeRequest(request)); - + public void closeContainer(long containerID) throws IOException { + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); + SCMCloseContainerRequestProto request = SCMCloseContainerRequestProto + .newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setContainerID(containerID) + .build(); + submitRequest(Type.CloseContainer, + builder -> builder.setScmCloseContainerRequest(request)); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index d03ad157220a..c3e9440425fd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -35,6 +35,7 @@ public enum SCMAction implements AuditAction { CLOSE_PIPELINE, ACTIVATE_PIPELINE, DEACTIVATE_PIPELINE, + CLOSE_CONTAINER, DELETE_CONTAINER, IN_SAFE_MODE, FORCE_EXIT_SAFE_MODE, diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 8ea72b6cd178..91c63d131777 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -47,7 +47,7 @@ message ScmContainerLocationRequest { optional SCMListContainerRequestProto scmListContainerRequest = 9; optional SCMDeleteContainerRequestProto scmDeleteContainerRequest = 10; optional NodeQueryRequestProto nodeQueryRequest = 11; - optional ObjectStageChangeRequestProto objectStageChangeRequest = 12; + optional SCMCloseContainerRequestProto scmCloseContainerRequest = 12; optional PipelineRequestProto pipelineRequest = 13; optional ListPipelineRequestProto listPipelineRequest = 14; optional ActivatePipelineRequestProto activatePipelineRequest = 15; @@ -79,7 +79,7 @@ message ScmContainerLocationResponse { optional SCMListContainerResponseProto scmListContainerResponse = 9; optional SCMDeleteContainerResponseProto scmDeleteContainerResponse = 10; optional NodeQueryResponseProto nodeQueryResponse = 11; - optional ObjectStageChangeResponseProto objectStageChangeResponse = 12; + optional SCMCloseContainerResponseProto scmCloseContainerResponse = 12; optional PipelineResponseProto pipelineResponse = 13; optional ListPipelineResponseProto listPipelineResponse = 14; optional ActivatePipelineResponseProto activatePipelineResponse = 15; @@ -106,7 +106,7 @@ enum Type { ListContainer = 4; DeleteContainer = 5; QueryNode = 6; - NotifyObjectStageChange = 7; + CloseContainer = 7; AllocatePipeline = 8; ListPipelines = 9; ActivatePipeline = 10; @@ -185,28 +185,12 @@ message SCMDeleteContainerResponseProto { // Empty response } -message ObjectStageChangeRequestProto { - enum Type { - container = 1; - pipeline = 2; - } - // delete/copy operation may be added later - enum Op { - create = 1; - close = 2; - } - enum Stage { - begin = 1; - complete = 2; - } - required int64 id = 1; - required Type type = 2; - required Op op= 3; - required Stage stage = 4; - optional string traceID = 5; +message SCMCloseContainerRequestProto { + required int64 containerID = 1; + optional string traceID = 2; } -message ObjectStageChangeResponseProto { +message SCMCloseContainerResponseProto { // Empty response } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 0d2f47000038..0e51dc39e160 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -42,8 +42,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; @@ -151,12 +151,12 @@ public ScmContainerLocationResponse processRequest( .setStatus(Status.OK) .setNodeQueryResponse(queryNode(request.getNodeQueryRequest())) .build(); - case NotifyObjectStageChange: + case CloseContainer: return ScmContainerLocationResponse.newBuilder() .setCmdType(request.getCmdType()) .setStatus(Status.OK) - .setObjectStageChangeResponse(notifyObjectStageChange( - request.getObjectStageChangeRequest())) + .setScmCloseContainerResponse(closeContainer( + request.getScmCloseContainerRequest())) .build(); case ListPipelines: return ScmContainerLocationResponse.newBuilder() @@ -297,12 +297,11 @@ public NodeQueryResponseProto queryNode( } - public ObjectStageChangeResponseProto notifyObjectStageChange( - ObjectStageChangeRequestProto request) + public SCMCloseContainerResponseProto closeContainer( + SCMCloseContainerRequestProto request) throws IOException { - impl.notifyObjectStageChange(request.getType(), request.getId(), - request.getOp(), request.getStage()); - return ObjectStageChangeResponseProto.newBuilder().build(); + impl.closeContainer(request.getContainerID()); + return SCMCloseContainerResponseProto.newBuilder().build(); } public ListPipelineResponseProto listPipelines( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 9c27f6a64d62..80a6a07861f4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.ScmUtils; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -366,33 +367,29 @@ public List queryNode(HddsProtos.NodeState state, } @Override - public void notifyObjectStageChange(StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Type type, long id, - StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op - op, StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Stage stage) throws IOException { - - LOG.info("Object type {} id {} op {} new stage {}", type, id, op, - stage); - if (type == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Type.container) { - if (op == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Op.close) { - if (stage == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Stage.begin) { - scm.getContainerManager() - .updateContainerState(ContainerID.valueof(id), - HddsProtos.LifeCycleEvent.FINALIZE); - } else { - scm.getContainerManager() - .updateContainerState(ContainerID.valueof(id), - HddsProtos.LifeCycleEvent.CLOSE); - } + public void closeContainer(long containerID) throws IOException { + final String remoteUser = getRpcRemoteUsername(); + final Map auditMap = Maps.newHashMap(); + auditMap.put("containerID", String.valueOf(containerID)); + auditMap.put("remoteUser", remoteUser); + try { + scm.checkAdminAccess(remoteUser); + final ContainerID cid = ContainerID.valueof(containerID); + final HddsProtos.LifeCycleState state = scm.getContainerManager() + .getContainer(cid).getState(); + if (!state.equals(HddsProtos.LifeCycleState.OPEN)) { + throw new SCMException("Cannot close a " + state + " container.", + ResultCodes.UNEXPECTED_CONTAINER_STATE); } - } // else if (type == ObjectStageChangeRequestProto.Type.pipeline) { - // TODO: pipeline state update will be addressed in future patch. - // } - + scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER, + ContainerID.valueof(containerID)); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.CLOSE_CONTAINER, auditMap)); + } catch (Exception ex) { + AUDIT.logWriteFailure(buildAuditMessageForFailure( + SCMAction.CLOSE_CONTAINER, auditMap, ex)); + throw ex; + } } @Override